Logstash 读取 Kafka 数据写入 HDFS - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
请不要在回答技术问题时复制粘贴 AI 生成的内容
37Y37
V2EX    程序员

Logstash 读取 Kafka 数据写入 HDFS

  •  
  •   37Y37 2019-03-27 09:21:02 +08:00 1876 次点击
    这是一个创建于 2393 天前的主题,其中的信息可能已经有所发展或是发生改变。

    强大的功能,丰富的插件,让 logstash 在数据处理的行列中出类拔萃

    通常日志数据除了要入 ES 提供实时展示和简单统计外,还需要写入大数据集群来提供更为深入的逻辑处理,前边几篇 ELK 的文章介绍过利用 logstash 将 kafka 的数据写入到 elasticsearch 集群,这篇文章将会介绍如何通过 logstash 将数据写入 HDFS

    本文所有演示均基于 logstash 6.6.2 版本

    数据收集

    logstash 默认不支持数据直接写入 HDFS,官方推荐的 output 插件是webhdfs,webhdfs 使用 HDFS 提供的 API 将数据写入 HDFS 集群

    插件安装

    插件安装比较简单,直接使用内置命令即可

    # cd /home/opt/tools/logstash-6.6.2 # ./bin/logstash-plugin install logstash-output-webhdfs 

    配置 hosts

    HDFS 集群内通过主机名进行通信所以 logstash 所在的主机需要配置 hadoop 集群的 hosts 信息

    # cat /etc/hosts 192.168.107.154 master01 192.168.107.155 slave01 192.168.107.156 slave02 192.168.107.157 slave03 

    如果不配置 host 信息,可能会报下边的错

    [WARN ][logstash.outputs.webhdfs ] Failed to flush outgoing items 

    logstash 配置

    kafka 里边的源日志格式可以参考这片文章:ELK 日志系统之使用 Rsyslog 快速方便的收集 Nginx 日志

    logstash 的配置如下:

    # cat config/indexer_rsyslog_nginx.conf input { kafka { bootstrap_servers => "10.82.9.202:9092,10.82.9.203:9092,10.82.9.204:9092" topics => ["rsyslog_nginx"] codec => "json" } } filter { date { match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"] target => "time_local" } ruby { code => "event.set('index.date', event.get('time_local').time.localtime.strftime('%Y%m%d'))" } ruby { code => "event.set('index.hour', event.get('time_local').time.localtime.strftime('%H'))" } } output { webhdfs { host => "master01" port => 50070 user => "hadmin" path => "/logs/nginx/%{index.date}/%{index.hour}.log" codec => "json" } stdout { codec => rubydebug } } 

    logstash 配置文件分为三部分:input、filter、output

    input指定源在哪里,我们是从 kafka 取数据,这里就写 kafka 集群的配置信息,配置解释:

    • bootstrap_servers:指定 kafka 集群的地址
    • topics:需要读取的 topic 名字
    • codec:指定下数据的格式,我们写入的时候直接是 json 格式的,这里也配置 json 方便后续处理

    filter可以对 input 输入的内容进行过滤或处理,例如格式化,添加字段,删除字段等等

    • 这里我们主要是为了解决生成 HDFS 文件时因时区不对差 8 小时导致的文件名不对的问题,后边有详细解释

    output指定处理过的日志输出到哪里,可以是 ES 或者是 HDFS 等等,可以同时配置多个,webhdfs 主要配置解释:

    • host:为 hadoop 集群 namenode 节点名称
    • user:为启动 hdfs 的用户名,不然没有权限写入数据
    • path:指定存储到 HDFS 上的文件路径,这里我们每日创建目录,并按小时存放文件
    • stdout:打开主要是方便调试,启动 logstash 时会在控制台打印详细的日志信息并格式化方便查找问题,正式环境建议关闭

    webhdfs 还有一些其他的参数例如compression,flush_size,standby_host,standby_port等可查看官方文档了解详细用法

    启动 logstash

    # bin/logstash -f config/indexer_rsyslog_nginx.conf 

    因为 logstash 配置中开了stdout输出,所以能在控制台看到格式化的数据,如下:

    { "server_addr" => "172.18.90.17", "http_user_agent" => "Mozilla/5.0 (iPhone; CPU iPhone OS 10_2 like Mac OS X) AppleWebKit/602.3.12 (KHTML, like Gecko) Mobile/14C92 Safari/601.1 wechatdevtools/1.02.1902010 MicroMessenger/6.7.3 Language/zh_CN webview/ token/e7b92168159736c30401a55589317d8c", "remote_addr" => "172.18.101.0", "status" => 200, "http_referer" => "https://ops-coffee.cn/wx02935bb29080a7b4/devtools/page-frame.html", "upstream_response_time" => "0.056", "host" => "ops-coffee.cn", "request_uri" => "/api/community/v2/news/list", "request_time" => 0.059, "upstream_status" => "200", "@version" => "1", "http_x_forwarded_for" => "192.168.106.100", "time_local" => 2019-03-18T11:03:45.000Z, "body_bytes_sent" => 12431, "@timestamp" => 2019-03-18T11:03:45.984Z, "index.date" => "20190318", "index.hour" => "19", "request_method" => "POST", "upstream_addr" => "127.0.0.1:8181" } 

    查看 hdfs 发现数据已经按照定义好的路径正常写入

    $ hadoop fs -ls /logs/nginx/20190318/19.log -rw-r--r-- 3 hadmin supergroup 7776 2019-03-18 19:07 /logs/nginx/20190318/19.log 

    至此 kafka 到 hdfs 数据转储完成

    遇到的坑

    HDFS 按小时生成文件名不对

    logstash 在处理数据时会自动生成一个字段@timestamp,默认情况下这个字段存储的是 logstash 收到消息的时间,使用的是 UTC 时区,会跟国内的时间差 8 小时

    我们 output 到 ES 或者 HDFS 时通常会使用类似于rsyslog-nginx-%{+YYYY.MM.dd}这样的变量来动态的设置 index 或者文件名,方便后续的检索,这里的变量YYYY使用的就是@timestamp中的时间,因为时区的问题生成的 index 或者文件名就差 8 小时不是很准确,这个问题在 ELK 架构中因为全部都是用的 UTC 时间且最终 kibana 展示时会自动转换我们无需关心,但这里要生成文件就需要认真对待下了

    这里采用的方案是解析日志中的时间字段time_local,然后根据日志中的时间字段添加两个新字段index.dateindex.hour来分别标识日期和小时,在 output 的时候使用这两个新加的字段做变量来生成文件

    logstash filter 配置如下:

    filter { # 匹配原始日中的 time_local 字段并设置为时间字段 # time_local 字段为本地时间字段,没有 8 小时的时间差 date { match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"] target => "time_local" } # 添加一个 index.date 字段,值设置为 time_local 的日期 ruby { code => "event.set('index.date', event.get('time_local').time.localtime.strftime('%Y%m%d'))" } # 添加一个 index.hour 字段,值设置为 time_local 的小时 ruby { code => "event.set('index.hour', event.get('time_local').time.localtime.strftime('%H'))" } } 

    output 的 path 中配置如下

    path => "/logs/nginx/%{index.date}/%{index.hour}.log" 

    HDFS 记录多了时间和 host 字段

    在没有指定 codec 的情况下,logstash 会给每一条日志添加时间和 host 字段,例如:

    源日志格式为

    ops-coffee.cn | 192.168.105.91 | 19/Mar/2019:14:28:07 +0800 | GET / HTTP/1.1 | 304 | 0 | - | 0.000 

    经过 logstash 处理后多了时间和 host 字段

    2019-03-19T06:28:07.510Z %{host} ops-coffee.cn | 192.168.105.91 | 19/Mar/2019:14:28:07 +0800 | GET / HTTP/1.1 | 304 | 0 | - | 0.000 

    如果不需要我们可以指定最终的 format 只取 message,解决方法为在 output 中添加如下配置:

    codec => line { format => "%{message}" } 

    同时 output 到 ES 和 HDFS

    在实际应用中我们需要同时将日志数据写入 ES 和 HDFS,那么可以直接用下边的配置来处理

    # cat config/indexer_rsyslog_nginx.conf input { kafka { bootstrap_servers => "localhost:9092" topics => ["rsyslog_nginx"] codec => "json" } } filter { date { match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } ruby { code => "event.set('index.date', event.get('@timestamp').time.localtime.strftime('%Y%m%d'))" } ruby { code => "event.set('index.hour', event.get('@timestamp').time.localtime.strftime('%H'))" } } output { elasticsearch { hosts => ["192.168.106.203:9200"] index => "rsyslog-nginx-%{+YYYY.MM.dd}" } webhdfs { host => "master01" port => 50070 user => "hadmin" path => "/logs/nginx/%{index.date}/%{index.hour}.log" codec => "json" } } 

    这里我使用 logstash 的 date 插件将日志中的"time_local"字段直接替换为了 @timestamp,这样做有什么好处呢?

    logstash 默认生成的 @timestamp 字段记录的时间是 logstash 接收到消息的时间,这个时间可能与日志产生的时间不同,而我们往往需要关注的时间是日志产生的时间,且在 ELK 架构中 Kibana 日志输出的默认顺序就是按照 @timestamp 来排序的,所以往往我们需要将默认的 @timestamp 替换成日志产生的时间,替换方法就用到了 date 插件,date 插件的用法如下

    date { match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } 

    match:匹配日志中的时间字段,这里为 time_local

    target:将 match 匹配到的时间戳存储到给定的字段中,默认不指定的话就存到 @timestamp 字段

    另外还有参数可以配置:timezone,locale,tag_on_failure等,具体可查看官方文档


    目前尚无回复
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2985 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 43ms UTC 13:40 PVG 21:40 LAX 06:40 JFK 09:40
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86