elk有什么elk插件下载可以实现通过规则来进行邮件报警

 上传我的文档
 下载
 收藏
该文档贡献者很忙,什么也没留下。
 下载此文档
elk项目总结
下载积分:3000
内容提示:elk项目总结
文档格式:DOCX|
浏览次数:2|
上传日期: 20:19:19|
文档星级:
全文阅读已结束,如果下载本文需要使用
 3000 积分
下载此文档
该用户还上传了这些文档
elk项目总结
关注微信公众号当前位置: >>
如何使用ELK栈工具来管理系统日志
摘要:我们曾经介绍过如何跨越多种设备搭建一个开源的监控设备日志的环境。接下来,下一步是要搭建云服务器来发送系统日志到中央的ELK服务器,在那里日志会被收集起来并用来分析与系统相关的新信息。
  我们曾经介绍过如何跨越多种设备搭建一个开源的监控设备日志的环境。接下来,下一步是要搭建云服务器来发送系统日志到中央的ELK服务器,在那里日志会被收集起来并用来分析与系统相关的新信息。
  应用和守护进程过去常常会将日志信息写到在.log文件来通知系统管理员有某些问题或行为发生。这里最主要的问题是日志格式的不规范,从而导致了效率低下,难以管理的日志解析与分析。当系统日志出现后,这其中的很多问题都得到了解决,但安全的问题依旧被忽略了,系统日志仍然没有被集中管理。
  现在,有一些云服务可以集中管理所有的单独系统里的系统日志。一名对云系统有访问权限的攻击者可以篡改系统日志来隐藏他的存在。为了防止这种可能性,我们可以使用一个集中管理的云日志服务器,在那里所有其他云服务器的日志都可以用来分析可能的恶意行为。
  在这篇的例子里,我们将使用一个ELK栈来提供一个用于集中管理和系统日志分析的环境。ELK栈是由三部分组成:即,Elasticsearch用于存储和索引日志,Logstash用于解析日志,以及Kibana做为搜索和分析系统日志的界面。
  环境介绍
  该环境可以使用Docker容器来搭建,在这个例子中是使用由Protean Security所提供的Docker容器。使用Docker
run命令来下载proteansec/elk镜像并forward几个端口。Logstash监听在TCP/UDP的5514端口上,负责接受和分析系统日志消息。Elasticsearch则监听端口,但这些端口不需要对外曝露,因为只有Logstash和Kibana会直接使用Elasticsearch。Kibana的Web界面跑在TCP的5601端口上,可用于搜索和分析日志。
  在建立环境之后,通过Docker镜像的主机IP地址加端口5601连接到Kibana的Web界面,以搜索和分析所有收集到的日志。
  要在一个集中管理的Docker容器中收集日志,配置每台云服务器发送日志到暴露的5514端口上。这可以通过在云服务器上安装rsyslog并在rsyslog.d目录中添加另一个配置文件,指明从每个系统日志子系统发送的每条消息都应该被发送到一个Docker容器的5514端口上。你需要创建另一个文件/etc/rsyslog.d/10-logstash.conf,具体内容为:&*.*
@@docker:5514&,其中Docker是一个DNS可解析的容器主机名。
  如果有好几个云系统都可以经由SSH访问,那么我们可以通过引入一个简单的Python库来自动化该过程。我们可以通过SSH发送命令来自动化部署和管理任务,进而能够在远程系统上运行指令。
  重要事件的检测
  OpenVPN应用会在用户认证成功后生成日志。在渗透测试和安全评估中,我们偶然的发现了有各种用于VPN服务器验证的证书和凭证。借由获得所有的必要文件,攻击者可以建立连到公司网络的VPN连接,并访问内部资源,这是特别危险的。为了检测这种行为,搭建一个ELK栈监控连接到服务器的VPN连接来识别用户,以及用户连接来源的IP地址。当一个用户从某个遥远的国家的未知IP连接过来,这类的事件可以被检测到并通报给一个安全专业人士,其可以验证该连接是否是恶意的。
  当用户在服务器上认证成功时,OpenVPN服务器会产生以下事件:
  [plain]&29&Aug 21 15:01:32 openvpn[9739]: 1.2.3.4:39236
[name.surname] Peer Connection Initiated with [AF_INET]1.2.3.4:39236[/plain]
  该日志消息已经被推送到ELK栈,在那里Logstash会解析它,但目前还没有相应的规则来从中提取出用户名和IP地址。要编写适当的规则,使用Grok
Debugger服务,该服务接受系统日志和规则,并根据规则来解析消息。
  下面显示的是一条适当的规则将如何解析消息,其中用户名被存储在&vpnuser&变量中,主机名存在&vpnhost&变量中:
  在Kibana界面中,经过解析的消息包含了字段vpnhost(远程用户的IP地址),vpnport,或远程用户端口,以及vpnuser(授权用户的ID),如下图所示:
  要在消息中增加地理信息,可以在Logstash中启用GeoIP。这会自动在每一条包含vpnhost
IP地址的消息中添加经纬度。一些额外的地理信息字段将被加到适当的消息中,用以识别VPN会话建立的国家。
  我们还可以很容易地新建一个地图来显示所有连接的VPN用户的IP地址,并在Kibana中呈现出来。下图显示了一个来自斯洛文尼亚的IP,在该图中任何新增加的VPN会话的IP会很容易分辨出来:
  集中控制的日志管理系统:企业安全必备
  在任何时候,每个系统都会生成一些事件来反应当前正在进行的操作的一些信息。系统日志用于将某些相关操作的信息传达给系统管理员。
  妥善处理用户认证时生成的OpenVPN系统日志信息很重要。需要写入适当的Logstash规则来解析消息以提取敏感信息-例如用户名,以及用户连接来源的IP地址。IP地址可进一步用于检测VPN连接建立的来源国。
  如果事件发生时员工是在另一个国家,安全警报会被触发通知安全专业人士这是一次可能的攻击。这样的事件可能发生在两种情况下。第一种用例是,用户从一次会议或在度假时连接到VPN,这不应该被视为安全问题。第二种情况涉及到黑客获得该用户的私钥和密码凭证连接到企业VPN服务器,这对公司来说可能是毁灭性的,因为它难以检测,但用上述的工具和技术则有可能做到。
  为了将所有的系统日志都存放到一个地方,拥有一个集中控制的日志管理系统对于每一个现代企业来说是必须的。这对于发现可能的异常和威胁是必要的,否则这些异常和威胁可能几周甚至几个月都不会有人注意到。
  责任编辑:DJ编辑
扫一扫,订阅更多数据中心资讯
本文地址: 网友评论:
pubajax('/comment.aspx','id=&commCount=1&ChID=0&Today=0','gCount3870');条 阅读次数:pubajax('/click.aspx','id=','click_');
版权声明:凡本站原创文章,未经授权,禁止转载,否则追究法律责任。
·····
function GetCommentList(page)
var Action='id=&ChID=0&CommentType=GetCommentList&page='+
var options={
method:'get',
parameters:Action,
onComplete:function(transport)
var returnvalue=transport.responseT
if (returnvalue.indexOf("??")>-1)
document.getElementById("Div_CommentList").innerHTML='加载评论列表失败';
document.getElementById("Div_CommentList").innerHTML=
Ajax.Request('/comment.aspx?no-cache='+Math.random(),options);
GetCommentList(1);
function GetAddCommentForm()
var Action='id=&ChID=0&CommentType=GetAddCommentForm';
var options={
method:'get',
parameters:Action,
onComplete:function(transport)
var returnvalue=transport.responseT
var arrreturnvalue=returnvalue.split('$$$');
if (arrreturnvalue[0]=="ERR")
document.getElementById("Div_CommentForm").innerHTML='加载评论表单失败!';
document.getElementById("Div_CommentForm").innerHTML=arrreturnvalue[1];
Ajax.Request('/comment.aspx?no-cache='+Math.random(),options);
GetAddCommentForm();
function CommandSubmit(obj)
if(obj.UserNum.value=="")
obj.UserNum.value="Guest";
if(obj.Content.value=="")
alert('评论内容不能为空');
var commtypevalue = '2';
for(var i=0;i<r.i++)
if(r[i].checked)
commtypevalue=r[i].
var Action='CommentType=AddComment&UserNum='+escape(obj.UserNum.value)+'&UserPwd='+escape(obj.UserPwd.value)+'&commtype='+escape(commtypevalue)+'&Content='+escape(obj.Content.value)+'&IsQID='+escape(obj.IsQID.value)+'&id=&Chid=0';
var options={
method:'get',
parameters:Action,
onComplete:function(transport)
var returnvalue=transport.responseT
var arrreturnvalue=returnvalue.split('$$$');
if (arrreturnvalue[0]=="ERR")
alert(arrreturnvalue[1]);
GetAddCommentForm();
alert('发表评论成功!');
document.getElementById("Div_CommentList").innerHTML=arrreturnvalue[1];
GetAddCommentForm();
Ajax.Request('/comment.aspx?no-cache='+Math.random(),options);
function CommentLoginOut()
var Action='CommentType=LoginOut';
var options={
method:'get',
parameters:Action,
onComplete:function(transport)
var returnvalue=transport.responseT
var arrreturnvalue=returnvalue.split('$$$');
if (arrreturnvalue[0]=="ERR")
alert('未知错误!');
document.getElementById("Div_CommentForm").innerHTML=arrreturnvalue[1];
Ajax.Request('/comment.aspx?no-cache='+Math.random(),options);
window._bd_share_config={"common":{"bdSnsKey":{},"bdText":"","bdMini":"2","bdMiniList":false,"bdPic":"","bdStyle":"0","bdSize":"32"},"share":{},"image":{"viewList":["weixin","tsina","tqq","bdhome","sqq","bdysc"],"viewText":"分享到:","viewSize":"32"},"selectShare":{"bdContainerClass":null,"bdSelectMiniList":["weixin","tsina","tqq","bdhome","sqq","bdysc"]}};with(document)0[(getElementsByTagName('head')[0]||body).appendChild(createElement('script')).src='http://bdimg./static/api/js/share.js?v=.js?cdnversion='+~(-new Date()/36e5)];
中国数据中心工作组CDCC年度论坛
近年来,我国智慧城市建设开展得
随着互联网特别是移动互联网的快
主题为“精简IT,敏捷商道” 的
半导体致冷器是由半导体所组成的
“混合云”这个词内涵外延宽泛,sudo yum update
python版本最好2.7版本以上(现在centos7好像都能满足)
2.安装elastalert(按照官网的操作走)
先切换到自己的工作目录
我这里是 /usr/local/dev/
git 克隆源码
git clone https:
cd elastalert
sudo python setup.py install
Running blist-1.3.6/setup.py -q bdist_egg –dist-dir /tmp/easy_install-Gc6gbe/blist-1.3.6/egg-dist-tmp-Ik7LL2
The required version of setuptools (&=1.1.6) is not available,
and can’t be installed while this script is running. Please
install a more recent version first, using
‘easy_install -U setuptools’.
(Currently using setuptools 0.9.8 (/usr/lib/python2.7/site-packages))
ok按照它提示的,,,执行:
sudo easy_install -U setuptools
再执行安装(sudo python setup.py install),又出错了
warning: no files found matching 'blist.rst'
blist/_blist.c:38:20: 致命错误:Python.h:没有那个文件或目录
这其实是缺少开发包,执行命令:
sudo yum install python-devel
再执行安装(sudo python setup.py install)
到这里基本能安装成功。
有时候会报一些包找不到的错误,可能是网络问题,重复执行安装命令即可(我在阿里云上遇到了)。
继续执行安装命令
pip install -r requirements.txt
找不到pip ,先安装pip并更新
sudo yum -y install python-pip
sudo pip install --upgrade pip
安装完继续
sudo pip install -r requirements.txt
在elasticsearch中创建elastalert的日志索引
sudo elastalert-create-index
根据自己的情况,填入elasticsearch的相关信息,关于
elastalert_status部分直接回车默认的即可。
如下所示:
Enter elasticsearch host: log.
Enter elasticsearch port: 9200
Use SSL? t/f: t
Enter optional basic-auth username (or leave blank): es_admin
Enter optional basic-auth password (or leave blank):
Enter optional Elasticsearch URL prefix (prepends a string to the URL of every request):
New index name? (Default elastalert_status)
Name of existing index to copy? (Default None)
/usr/lib/python2.7/site-packages/requests/packages/urllib3/connectionpool.py:734: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https:
InsecureRequestWarning)
/usr/lib/python2.7/site-packages/requests/packages/urllib3/connectionpool.py:734: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https:
InsecureRequestWarning)
/usr/lib/python2.7/site-packages/requests/packages/urllib3/connectionpool.py:734: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https:
InsecureRequestWarning)
/usr/lib/python2.7/site-packages/requests/packages/urllib3/connectionpool.py:734: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https:
InsecureRequestWarning)
/usr/lib/python2.7/site-packages/requests/packages/urllib3/connectionpool.py:734: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https:
InsecureRequestWarning)
/usr/lib/python2.7/site-packages/requests/packages/urllib3/connectionpool.py:734: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https:
InsecureRequestWarning)
/usr/lib/python2.7/site-packages/requests/packages/urllib3/connectionpool.py:734: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https:
InsecureRequestWarning)
New index elastalert_status created
3.创建配置文件
sudo cp config.yaml.example config.yaml
sudo vi config.yaml
根据自己的具体情况进行修改
比如我这里的elasticsearch是ssl 并需要用户登陆的,配置大致如下:
run_every:
minutes: 1
buffer_time:
minutes: 15
es_host: log.example.com
es_port: 9200
use_ssl: True
es_send_get_body_as: GET
es_username: es_admin
es_password: es_password
writeback_index: elastalert_status
alert_time_limit:
以同样的方式配置规则
cd example_rules/
sudo cp example_frequency.yaml my_rule.yaml
sudo vi my_rule.yaml
这里就是根据具体的elasticsearch的信息进行配置
es_host: log.example.com
es_port: 9200
use_ssl: True
es_username: es_admin
es_password: es_password
#name属性要求唯一,这里最好能标示自己的产品
name: My-Product Exception Alert
#类型,我选择任何匹配的条件都发送邮件警告
#需要监控的索引,支持通配
index: logstash-*
#下面两个随意配置
num_events: 50
timeframe:
#根据条件进行过滤查询(这里我只要出现异常的日志,并且排除业务异常(自定义异常))
query_string:
query: "message: *exception* AND message: (!*BusinessException*) AND message: (!*ServiceException*)"
#email的警告方式
#增加邮件内容,这里我附加一个日志访问路径
alert_text: "Ref Log https://log.:5601/app/kibana"
#SMTP协议的邮件服务器相关配置(我这里是腾讯企业邮箱)
smtp_host: smtp.exmail.qq.com
smtp_port: 25
#用户认证文件,需要user和password两个属性
smtp_auth_file: smtp_auth_file.yaml
email_reply_to: no-reply@example.com
from_addr: no-reply@example.com
#需要接受邮件的邮箱地址列表
接下来创建smtp_auth_file.yaml
sudo touch smtp_auth_file.yaml
sudo vi smtp_auth_file.yaml
##配置文件内容
user: "no-"
password: "password"
现在可以简单测试一下配置文件是否正确
sudo elastalert-test-rule ./my_rule.yaml
如果有问题,就检查配置文件。
然后启动运行一下试试:
- . -- -- .
然后故意产生一下异常试试,比如
throw new NullPointerException("测试日志异常监控服务");
执行一下,一般没啥问题,邮件应该就能收到了,有问题就检查配置文件。
4.我是要监控服务,所以要让elastalert以服务的形式运行
首先创建elastalert的配置目录和相关文件
sudo mkdir /etc/elastalert
cd /etc/elastalert
-- 复制配置文件
sudo cp /usr/local/dev/elastalert/config.yaml config.yaml
sudo mkdir rules
-- 复制规则文件
sudo cp /usr/local/dev/elastalert/example_rules/my_rule.yaml my_rule.yaml
-- 复制邮件用户认证文件
sudo cp /usr/local/dev/elastalert/example_rules/smtp_auth_file.yaml smtp_auth_file.yaml
接下来修改配置文件
修改 config.yaml 中
rules_folder: /etc/elastalert/rules
修改 my_rule.yaml中
smtp_auth_file: /etc/elastalert/rules/umu_smtp_auth_file.yaml
接下来就是创建systemd服务了
cd /etc/systemd/system
sudo touch elastalert.service
sudo vi elastalert.service
elastalert.service 内容
Description=elastalert
After=elasticsearch.service
Type=simple
Group=root
Restart=on-failure
WorkingDirectory=/usr/local/dev/elastalert
ExecStart=/usr/bin/elastalert --config /etc/elastalert/config.yaml --rule /etc/elastalert/rules/my_rule.yaml
WantedBy=multi-user.target
保存退出。
sudo systemctl start elastalert
sudo systemctl status elastalert
不出意外的话,你将看到绿色的Active: active (running)
至此,日志的异常监控服务就搭建好了,好的,收工!
本文已收录于以下专栏:
相关文章推荐
转载请注明出处:http://blog.csdn.net/gamer_gyt
博主微博:/
Github:...
可视化监控报警插件 KAAE:Kibi + Kibana Alert & Report App for Elasticsearch。
&#160; 最近搭建了ELK系统,便想在此基础上做一个基于Kibana的可...
现在的系统,动则分布式,分布式情况下,错误的定位比传统的单机要麻烦很多。因为log文件分布在不同的feature的不同节点。一直想找一种方式来简化这个过程。
目前主流的log监控有:
Kibana 可视化监控报警插件 KAAE 的介绍与使用
filebeat配置:
filebeat.prospectors:
- input_type: log
&#160; paths:
&#160;&#160;&#160; - /var/log/secure
&#160; exclud...
ELK Stack最近因工作需要部署一个日志分析监控平台, 最后选型ELK Stack:
elasticsearch-2.1.1
logstash-2.1.1
kibana-4.3.1
redis-3...
随着公司业务发展,支撑公司业务的各种系统越来越多,为了保证公司的业务正常发展,急需要对这些线上系统的运行进行监控,做到问题的及时发现和处理,最大程度减少对业务的影响。日志监控告警系统基于的日志进行监控...
ELK(Elasticsearch+LogStash+Kibana),最近使用ELK处理了一些平台日志,下面以「mysql连接数监控」记录部署流程
日志监控和分析在保障业务稳定运行时,起到了很重要的作用,不过一般情况下日志都分散在各个生产服务器,且开发人员无法登陆生产服务器,这时候就需要一个集中式的日志收集装置,对日志中的关键字进行监控,触发异常...
他的最新文章
讲师:姜飞俊
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)基于 ELK Stack 和 Spark Streaming 的日志处理平台设计与实现
概述大数据时代,随着数据量不断增长,存储与计算集群的规模也逐渐扩大,几百上千台的云计算环境已不鲜见。现在的集群所需要解决的问题不仅仅是高性能、高可靠性、高可扩展性,还需要面对易维护性以及数据平台内部的数据共享性等诸多挑战。优秀的系统运维平台既能实现数据平台各组件的集中式管理、方便系统运维人员日常监测、提升运维效率,又能反馈系统运行状态给系统开发人员。例如采集数据仓库的日志可以按照时间序列查看各数据库实例各种级别的日志数量与占比,采集 DB2 表空间数据分析可得到数据库集群健康状态,分析应用服务器的日志可以查看出错最多的模块、下载最多的文件、使用最多的功能等。大数据时代的业务与运维将紧密的结合在一起。日志1. 什么是日志日志是带时间戳的基于时间序列的机器数据,包括 IT 系统信息(服务器、网络设备、操作系统、应用软件)、物联网各种传感器信息。日志可以反映用户实际行为,是真实的数据。2. 日志处理方案演进图 1. 日志处理方案经历的版本迭代
日志处理 v1.0:日志没有集中式处理;只做事后追查,黑客入侵后删除日志无法察觉;使用数据库存储日志,无法胜任复杂事务处理。
日志处理 v2.0:使用 Hadoop 平台实现日志离线批处理,缺点是实时性差;使用 Storm 流处理框架、Spark 内存计算框架处理日志,但 Hadoop/Storm/Spark 都是编程框架,并不是拿来即用的平台。
日志处理 v3.0:使用日志实时搜索引擎分析日志,特点:第一是快,日志从产生到搜索分析出结果只有数秒延时;第二是大,每天处理 TB 日志量;第三是灵活,可搜索分析任何日志。作为代表的解决方案有 Splunk、ELK、SILK。图 2. 深度整合 ELK、Spark、Hadoop 构建日志分析系统ELK StackELK Stack 是开源日志处理平台解决方案,背后的商业公司是 Elastic(https://www.elastic.co/)。它由日志采集解析工具 Logstash、基于 Lucene 的全文搜索引擎 Elasticsearch、分析可视化平台 Kibana 组成。目前 ELK 的用户有 Adobe、Microsoft、Mozilla、Facebook、Stackoverflow、Cisco、ebay、Uber 等诸多知名厂商。1. LogstashLogstash 是一种功能强大的信息采集工具,类似于 Hadoop 生态圈里的 Flume。通常在其配置文件规定 Logstash 如何处理各种类型的事件流,一般包含 input、filter、output 三个部分。Logstash 为各个部分提供相应的插件,因而有 input、filter、output 三类插件完成各种处理和转换;另外 codec 类的插件可以放在 input 和 output 部分通过简单编码来简化处理过程。下面以 DB2 的一条日志为例。图 3.DB2 数据库产生的半结构化日志样例这是一种多行的日志,每一条日志以:“-12.19.46.”格式的时间戳为起始标志。可以在 input 部分引入 codec 插件 multiline,来将一条日志的多行文本封装到一条消息(message)中。input {
path =& "path/to/filename"
codec =& multiline {
pattern =& "^\d{4}-\d{2}-\d{2}-\d{2}\.\d{2}\.\d{2}\.\d{6}[\+-]\d{3}"
negate =& true
what =& previous
}使用 file 插件导入文件形式的日志,而嵌入的 codec 插件 multiline 的参数 pattern 就规定了分割日志条目的时间戳格式。在 DataStage 多行日志的实际应用中,有时一条日志会超过 500 行,这超出了 multiline 组件默认的事件封装的最大行数,这需要我们在 multiline 中设置 max_lines 属性。经过 input 部分读入预处理后的数据流入 filter 部分,其使用 grok、mutate 等插件来过滤文本和匹配字段,并且我们自己可以为事件流添加额外的字段信息:filter {
gsub =& ['message', "\n", " "]
match =& { "message" =&
"(?&timestamp&%{YEAR}-%{MONTHNUM}-%{MONTHDAY}-%{HOUR}\.%{MINUTE}\.%{SECOND})%{INT:timezone}(?:%{SPACE}%{WORD:recordid}%{SPACE})(?:LEVEL%{SPACE}:%{SPACE}%{DATA:level}%{SPACE})(?:PID%{SPACE}:%{SPACE}%{INT:processid}%{SPACE})(?:TID%{SPACE}:%{SPACE}%{INT:threadid}%{SPACE})(?:PROC%{SPACE}:%{SPACE}%{DATA:process}%{SPACE})?(?:INSTANCE%{SPACE}:%{SPACE}%{WORD:instance}%{SPACE})?(?:NODE%{SPACE}:%{SPACE}%{WORD:node}%{SPACE})?(?:DB%{SPACE}:%{SPACE}%{WORD:dbname}%{SPACE})?(?:APPHDL%{SPACE}:%{SPACE}%{NOTSPACE:apphdl}%{SPACE})?(?:APPID%{SPACE}:%{SPACE}%{NOTSPACE:appid}%{SPACE})?(?:AUTHID%{SPACE}:%{SPACE}%{WORD:authid}%{SPACE})?(?:HOSTNAME%{SPACE}:%{SPACE}%{HOSTNAME:hostname}%{SPACE})?(?:EDUID%{SPACE}:%{SPACE}%{INT:eduid}%{SPACE})?(?:EDUNAME%{SPACE}:%{SPACE}%{DATA:eduname}%{SPACE})?(?:FUNCTION%{SPACE}:%{SPACE}%{DATA:function}%{SPACE})(?:probe:%{SPACE}%{INT:probe}%{SPACE})%{GREEDYDATA:functionlog}"
match =& [ "timestamp", "YYYY-MM-dd-HH.mm.ss.SSSSSS" ]
}前面 input 部分的 multiline 插件将一条多行日志项转化为一行,并以“\n”替代实际的换行符。为了便于后面处理,这里的 mutate 插件就是将这些“\n”替换为空格。而 grok 插件用于匹配提取日志项中有意义的字段信息。最后的 date 插件则是按格式“YYYY-MM-dd-HH.mm.ss.SSSSSS”解析提取的时间戳字段,并赋给系统默认的时间戳字段“@timestamp”。Output 插件用于指定事件流的去向,可以是消息队列、全文搜索引擎、TCP Socket、Email 等几十种目标端。2. ElasticsearchElasticsearch 是基于 Lucene 的近实时搜索平台,它能在一秒内返回你要查找的且已经在 Elasticsearch 做了索引的文档。它默认基于 Gossip 路由算法的自动发现机制构建配置有相同 cluster name 的集群,但是有的时候这种机制并不可靠,会发生脑裂现象。鉴于主动发现机制的不稳定性,用户可以选择在每一个节点上配置集群其他节点的主机名,在启动集群时进行被动发现。Elasticsearch 中的 Index 是一组具有相似特征的文档集合,类似于关系数据库模型中的数据库实例,Index 中可以指定 Type 区分不同的文档,类似于数据库实例中的关系表,Document 是存储的基本单位,都是 JSON 格式,类似于关系表中行级对象。我们处理后的 JSON 文档格式的日志都要在 Elasticsearch 中做索引,相应的 Logstash 有 Elasticsearch output 插件,对于用户是透明的。Hadoop 生态圈为大规模数据集的处理提供多种分析功能,但实时搜索一直是 Hadoop 的软肋。如今,Elasticsearch for Apache Hadoop(ES-Hadoop)弥补了这一缺陷,为用户整合了 Hadoop 的大数据分析能力以及 Elasticsearch 的实时搜索能力.图 4. 应用 es-hadoop 整合 Hadoop Ecosystem 与 Elasticsearch 架构图(https://www.elastic.co/products/hadoop)3. KibanaKibana 是专门设计用来与 Elasticsearch 协作的,可以自定义多种表格、柱状图、饼状图、折线图对存储在 Elasticsearch 中的数据进行深入挖掘分析与可视化。下图定制的仪表盘可以动态监测数据库集群中每个数据库实例产生的各种级别的日志。图 5. 实时监测 DB2 实例运行状态的动态仪表盘KafkaKafka 是 LinkedIn 开源的分布式消息队列,它采用了独特的消费者-生产者架构实现数据平台各组件间的数据共享。集群概念中的 server 在 Kafka 中称之为 broker,它使用主题管理不同类别的数据,比如 DB2 日志归为一个主题,tomcat 日志归为一个主题。我们使用 Logstash 作为 Kafka 消息的生产者时,output 插件就需要配置好 Kafka broker 的列表,也就是 Kafka 集群主机的列表;相应的,用作 Kafka 消费者角色的 Logstash 的 input 插件就要配置好需要订阅的 Kafka 中的主题名称和 ZooKeeper 主机列表。Kafka 通过将数据持久化到硬盘的 Write Ahead Log(WAL)保证数据可靠性与顺序性,但这并不会影响实时数据的传输速度,实时数据仍是通过内存传输的。Kafka 是依赖于 ZooKeeper 的,它将每组消费者消费的相应 topic 的偏移量保存在 ZooKeeper 中。据称 LinkedIn 内部的 Kafka 集群每天已能处理超过 1 万亿条消息。图 6. 基于消息订阅机制的 Kafka 架构除了可靠性和独特的 push&pull 架构外,相较于其他消息队列,Kafka 还拥有更大的吞吐量:图 7. 基于消息持久化机制的消息队列吞吐量比较Spark StreamingSpark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。它将批处理、流处理、即席查询融为一体。Spark 社区也是相当火爆,平均每三个月迭代一次版本更是体现了它在大数据处理领域的地位。Spark Streaming 不同于 Storm,Storm 是基于事件级别的流处理,Spark Streaming 是 mini-batch 形式的近似流处理的微型批处理。Spark Streaming 提供了两种从 Kafka 中获取消息的方式:第一种是利用 Kafka 消费者高级 API 在 Spark 的工作节点上创建消费者线程,订阅 Kafka 中的消息,数据会传输到 Spark 工作节点的执行器中,但是默认配置下这种方法在 Spark Job 出错时会导致数据丢失,如果要保证数据可靠性,需要在 Spark Streaming 中开启Write Ahead Logs(WAL),也就是上文提到的 Kafka 用来保证数据可靠性和一致性的数据保存方式。可以选择让 Spark 程序把 WAL 保存在分布式文件系统(比如 HDFS)中。第二种方式不需要建立消费者线程,使用 createDirectStream 接口直接去读取 Kafka 的 WAL,将 Kafka 分区与 RDD 分区做一对一映射,相较于第一种方法,不需再维护一份 WAL 数据,提高了性能。读取数据的偏移量由 Spark Streaming 程序通过检查点机制自身处理,避免在程序出错的情况下重现第一种方法重复读取数据的情况,消除了 Spark Streaming 与 ZooKeeper/Kafka 数据不一致的风险。保证每条消息只会被 Spark Streaming 处理一次。以下代码片通过第二种方式读取 Kafka 中的数据:// Create direct kafka stream with brokers and topics
JavaPairInputDStream&String, String& messages = KafkaUtils.createDirectStream(
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet);
messages.foreachRDD(new Function&JavaPairRDD&String,String&,Void&(){
public Void call(JavaPairRDD&String, String& v1)
throws Exception {
v1.foreach(new VoidFunction&Tuple2&String, String&&(){
public void call(Tuple2&String, String& tuple2) {
JSONObject a = new JSONObject(tuple2._2);
...Spark Streaming 获取到消息后便可以通过 Tuple 对象自定义操作消息,如下图是针对 DB2 数据库日志的邮件告警,生成告警邮件发送到 Notes 邮箱:图 8. 基于 Spark Streaming 对 DB2 异常日志实现 Notes 邮件告警互联网行业日志处理方案举例介绍与应用1. 新浪新浪采用的技术架构是常见的 Kafka 整合 ELK Stack 方案。Kafka 作为消息队列用来缓存用户日志;使用 Logstash 做日志解析,统一成 JSON 格式输出给 Elasticsearch;使用 Elasticsearch 提供实时日志分析与强大的搜索和统计服务;Kibana 用作数据可视化组件。该技术架构目前服务的用户包括微博、微盘、云存储、弹性计算平台等十多个部门的多个产品的日志搜索分析业务,每天处理约 32 亿条(2TB)日志。新浪的日志处理平台团队对 Elasticsearch 做了大量优化(比如调整 max open files 等),并且开发了一个独立的 Elasticsearch Index 管理系统,负责索引日常维护任务(比如索引的创建、优化、删除、与分布式文件系统的数据交换等)的调度及执行。为 Elasticsearch 安装了国内中文分词插件 elasticsearch-analysis-ik,满足微盘搜索对中文分词的需求。(见参考资源 2)2. 腾讯腾讯蓝鲸数据平台告警系统的技术架构同样基于分布式消息队列和全文搜索引擎。但腾讯的告警平台不仅限于此,它的复杂的指标数据统计任务通过使用 Storm 自定义流式计算任务的方法实现,异常检测的实现利用了曲线的时间周期性和相关曲线之间的相关性去定义动态的阈值,并且基于机器学习算法实现了复杂的日志自动分类(比如 summo logic)。告警平台把拨测(定时 curl 一下某个 url,有问题就告警)、日志集中检索、日志告警(5 分钟 Error 大于 X 次告警)、指标告警(cpu 使用率大于 X 告警)整合进同一个数据管线,简化了整体的架构。(见参考资源 3)3. 七牛七牛采用的技术架构为 Flume+Kafka+Spark,混部在 8 台高配机器。根据七牛技术博客提供的数据,该日志处理平台每天处理 500 亿条数据,峰值 80 万 TPS。&#160;Flume 相较于 Logstash 有更大的吞吐量,而且与 HDFS 整合的性能比 Logstash 强很多。七牛技术架构选型显然考虑了这一点,七牛云平台的日志数据到 Kafka 后,一路同步到 HDFS,用于离线统计,另一路用于使用 Spark Streaming 进行实时计算,计算结果保存在 Mongodb 集群中。(见参考资源 4)任何解决方案都不是十全十美的,具体采用哪些技术要深入了解自己的应用场景。就目前日志处理领域的开源组件来说,在以下几个方面还比较欠缺:
Logstash 的内部状态获取不到,目前没有好的成熟的监控方案。
Elasticsearch 具有海量存储海量聚合的能力,但是同 Mongodb 一样,并不适合于写入数据非常多(1 万 TPS 以上)的场景。
缺乏真正实用的异常检测方法;实时统计方面缺乏成熟的解决方案,Storm 就是一个底层的执行引擎,而 Spark 还缺少时间窗口等抽象。
对于日志自动分类,还没有开源工具可以做到 summo logic 那样的效果。结束语大数据时代的运维管理意义重大,好的日志处理平台可以事半功倍的提升开发人员和运维人员的效率。本文通过简单用例介绍了 ELK Stack、Kafka 和 Spark Streaming 在日志处理平台中各自在系统架构中的功能。现实中应用场景繁多复杂、数据形式多种多样,日志处理工作不是一蹴而就的,分析处理过程还需要在实践中不断挖掘和优化,笔者也将致力于 DB2 数据库运行状态更细节数据的收集和更全面细致的监控。
1. 参考“”,详细了解日志处理。
2. 查看文章“”,了解新浪的日志处理方案设计。
3. 查看文章“”,了解腾讯在大型系统运维工程上的实践。
4. 查看文章“”,了解七牛海量日志处理解决方案。
5. 查看文章“”,了解去哪儿网利用日志在集群管理中的实践。
6. 参考网站“”,了解基于消息持久化的各消息队列各种负载下的性能分析。在 ,了解关于大数据的更多信息,获取技术文档、how-to 文章、培训、下载、产品信息以及其他资源。
添加或订阅评论,请先或。
有新评论时提醒我
static.content.url=/developerworks/js/artrating/SITE_ID=10Zone=Big data and analytics, Information ManagementArticleID=1023468ArticleTitle=基于 ELK Stack 和 Spark Streaming 的日志处理平台设计与实现publish-date=}

我要回帖

更多关于 grasshopper插件elk 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信