博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop生态圈-Flume的组件之拦截器与选择器
阅读量:7022 次
发布时间:2019-06-28

本文共 4858 字,大约阅读时间需要 16 分钟。

                   Hadoop生态圈-Flume的组件之拦截器与选择器

                                            作者:尹正杰

版权声明:原创作品,谢绝转载!否则将追究法律责任。

 

 

  本篇博客只是配置的是Flume主流的Interceptors,想要了解更详细的配置信息请参考官网:http://flume.apache.org/FlumeUserGuide.html#flume-interceptors。

   想必大家都知道Flume的组件有Source,channel和sink。其实在Flume还有一些更深层的东西,比如你知道soucre是如何将数据传送给channel的吗?那你有知道channel又是如何将数据发送给sink的吗?对于一个Agent来说,它只能有一个source,但是它可以有多个channel和sink,如下图: 

   接下来就跟着我一起了了解一下更深层次的知识吧。接下来我们就一起探讨一下source是如何将数据发送到channel中的,以及sink是处理数据的。

 

一.Source端源码查看

1>.获取一行数据,使用其构建Event

2>.使用processEvent处理数据

 

3>.在处理过程中,event需要通过拦截器链,相当于过滤数据

4>.在拦截器链中,通过迭代所有拦截器,对数据进行多次处理(例如:host拦截器,是对event进行添加头部操作)

5>.通过拦截器处理后的event,再次进入到通道挑选器

 

6>.迭代所有channel,将数据放进channel中

  通过上面的源码解析,看下面这张图应该就不是什么难事了吧:

  这个时候,你是否绝对第一张图画得并不自信呢?这个时候我们可以把第一张图的Source端流程画得更详细一点,如下:

 

二.拦截器(Interceptors)

1>.Interceptors 功能

  答:拦截器是在source端的在处理过程中能够对数据(event)进行修改或丢弃的组件。

2>.官方文档

 

3>.host interceptor(将发送的event添加主机名的header)配置案例

  a>.实际配置参数:

[yinzhengjie@s101 ~]$ more /soft/flume/conf/yinzhengjie_hostInterceptor.conf# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 8888# 指定添加拦截器a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Buildera1.sources.r1.interceptors.i1.preserveExisting = false# 指定header的keya1.sources.r1.interceptors.i1.hostHeader = hostname# 指定header的value为主机ipa1.sources.r1.interceptors.i1.useIP = true# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 100000a1.channels.c1.transactionCapacity = 10000# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1[yinzhengjie@s101 ~]$

  b>启动agent进程:

 

  c>.source端产生数据(启动nc):

 

  d>.检查sink端数据(检查定义好的目录"/home/yinzhengjie/log2")

 

4>.static interceptor(静态拦截器,手动指定key-value)配置案例

  a>.实际配置参数:

 

[yinzhengjie@s101 ~]$ more /soft/flume/conf/yinzhengjie_staticInterceptor.conf# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 8888# 指定添加拦截器a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = statica1.sources.r1.interceptors.i1.key = namea1.sources.r1.interceptors.i1.value = yinzhengjie# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 100000a1.channels.c1.transactionCapacity = 10000# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1[yinzhengjie@s101 ~]$

 

  b>启动agent进程:

 

  c>.source端产生数据(启动nc): 

 

  d>.检查sink端数据(检查定义好的目录"/home/yinzhengjie/log2")

 

5>.timestamp interceptor(将发送的event添加时间戳的header)配置案例

  a>.实际配置参数:

 

[yinzhengjie@s101 ~]$ more /soft/flume/conf/yinzhengjie_timestampInterceptor.conf# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 8888# 指定添加拦截器a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = timestamp# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 100000a1.channels.c1.transactionCapacity = 10000# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1[yinzhengjie@s101 ~]$

 

 

  b>启动agent进程:

 

 

 

  c>.source端产生数据(启动nc): 

 

 

 

  d>.检查sink端数据(检查定义好的目录"/home/yinzhengjie/log2")

 

 

6>.interceptor chain(连接器链)配置案例

  a>.实际配置参数:

[yinzhengjie@s101 ~]$ more /soft/flume/conf/yinzhengjie_chainInterceptor.conf# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 8888# 指定添加拦截器a1.sources.r1.interceptors = i1 i2 i3a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Buildera1.sources.r1.interceptors.i1.preserveExisting = false# 指定header的keya1.sources.r1.interceptors.i1.hostHeader = hostname# 指定header的value为主机ipa1.sources.r1.interceptors.i1.useIP = true# 添加i2拦截器a1.sources.r1.interceptors.i2.type = timestamp# 添加i3拦截器a1.sources.r1.interceptors.i3.type = remove_headera1.sources.r1.interceptors.i3.withName = timestamp# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 100000a1.channels.c1.transactionCapacity = 10000# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1[yinzhengjie@s101 ~]$

 

  b>启动agent进程:

 

  c>.source端产生数据(启动nc): 

 

  d>.检查sink端数据(检查定义好的目录"/home/yinzhengjie/log2")

 

转载于:https://www.cnblogs.com/yinzhengjie/p/9205536.html

你可能感兴趣的文章
用JDBC写的对oracle数据库增删改查的小程序
查看>>
关于Nginx的一些优化
查看>>
J2EE系统异常的处理准则
查看>>
TCP三次握手连接及seq和ack号的正确理解
查看>>
打印目录中文件大小大于多少的文件
查看>>
非阻塞式JavaScript脚本及延伸知识
查看>>
专线与***冗余方案
查看>>
Go语言中的方法(Method Sets)
查看>>
嵌入式系统构件学习推荐的书 ucos的作者所著
查看>>
Android中内容提供者ContentProvider的理解与基本使用
查看>>
我的友情链接
查看>>
我的友情链接
查看>>
使用yaml语言来写配置文件
查看>>
go 入门学习笔记之 变量/常量/基本类型 (五)
查看>>
Android ActionBar居中显示标题
查看>>
硬盘的存储原理和内部架构
查看>>
爆笑配音神作来袭,新白娘子传奇相亲记
查看>>
android线程使用注意问题?【安卓进化二】
查看>>
常用JS方法
查看>>
hive优化--增加减少map数
查看>>