# 一、写 flume 的步骤
# 1.0.0 Flume 事务
# 1.1 画拓扑图
总结:一个 channel 只能输出一个结果文件。
一个 flume agent 由 source + channel + sink 构成,类比于 mapper + shuffer + reducer。
# 1.1.1 确定 source 类型
| 常用类型: |
| 1) arvo: 用于Flume agent 之间的数据源传递 |
| 2) netcat: 用于监听端口 |
| 3)exec: 用于执行linux中的操作指令 |
| 4) spooldir: 用于监视文件或目录 |
| 5) taildir: 用于监视文件或目录,同时支持追加的监听 |
| 总结 ,3/4/5三种方式,最常用的是5,适合用于监听多个实时追加的文件,并且能够实现断点续传。 |
# 1.1.2 确定 channel selector 的选择器
| 1)replicating channel selector:复制,每个channel发一份数据 |
| 2) multiplexing channel selector : 根据配置配件,指定source源获取的数据发往一个或多个channel |
# 1.1.3 确认 channel 类型参数
| 1) Memory Channel : 加载在内存中,存在数据丢失的风险 |
| 2) File Channel :落入磁盘 |
# 1.1.4 确定 sinkprocessor 参数
| 1) DefaultSinkProcessor:对应的是单个的Sink |
| 2) LoadBalancingSinkProcessor :对应的是多个的Sink,可以实现负载均衡的功能 |
| 3) FailoverSinkProcessor :对应的是多个的Sink,容错功能,先指定一个sink,所有的数据都走指定的sink,当sink故障以后,其他的sink顶上,如果开始sink恢复了,那么数据继续走原有指定的sink。 |
# 1.1.5 确定 sink 的类型
| 常使用的类型有: |
| 1) avro: 用于输出到下一个Flume Agent ,一个开源的序列化框架 |
| 2) hdfs: 输出到hdfs |
| 3) fill_roll: 输出到本地 |
| 4) logger: 输出到控制台 |
| 5) hbase: 输出到hbase |
# 1.1.6 拓扑例图
图 3
# 1.2 写配置文件
# 1.2.1 配置文件的构成
- Name the components on this agent -- agent Name
- Describe/configure the source -- source
- channel selector
- Describe the channel -- channel
- sinkprocessor
- Describe the sink --sink
- Bind the source and sink to the channel -- 连接 source、channel、sink
# 1.2.2 agent Name
情况 1:source、channel、sink 各一个
| a1.sources = r1 |
| a1.sinks = k1 |
| a1.channels = c1 |
情况 2:source 一个、channel 一个、sink 多个
| a1.sources = r1 |
| a1.channels = c1 |
| a1.sinkgroups = g1 |
| a1.sinks = k1 k2 |
情况 3:source 一个、channel 多个、sink 多个
| a1.sources = r1 |
| a1.sinks = k1 k2 |
| a1.channels = c1 c2 |
# 1.2.3 source
情况 1:avro
| a1.sources.r1.type = avro |
| a1.sources.r1.bind = hadoop102 |
| a1.sources.r1.port = 4141 |
情况 2:netcat
| a1.sources.r1.type = netcat |
| a1.sources.r1.bind = localhost |
| a1.sources.r1.port = 44444 |
情况 3:exec
| a1.sources.r1.type = exec |
| a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log |
| a1.sources.r1.shell = /bin/bash -c |
情况 4: sqooldir
| |
| a1.sources.r1.type = spooldir |
| a1.sources.r1.spoolDir = /opt/module/flume/upload |
| a1.sources.r1.fileSuffix = .COMPLETED |
| a1.sources.r1.fileHeader = true |
| |
| a1.sources.r1.ignorePattern = ([^ ]*\.tmp) |
情况 5:talidir
| |
| a1.sources.r1.type = TAILDIR |
| a1.sources.r1.positionFile = /opt/module/flume/tail_dir.json |
| a1.sources.r1.batchSize=500 |
| a1.sources.r1.filegroups = f1 f2 |
| a1.sources.r1.filegroups.f1 = /opt/module/flume/files/.*file.* |
| a1.sources.r1.filegroups.f2 = /opt/module/flume/files/.*log.* |
# 1.2.4 channel selector
情况 1: replicating channel selector
# 将数据流复制给所有channel
a1.sources.r1.selector.type = replicating
情况 2:multiplexing channel selector 需配合指定的拦截器使用(interceptor)
| |
| a1.sources.r1.interceptors = i1 |
| a1.sources.r1.interceptors.i1.type = com.miyazono.flume.interceptor.CustomInterceptor$Builder |
| |
| |
| |
| a1.sources.r1.selector.type = multiplexing |
| a1.sources.r1.selector.header = type |
| a1.sources.r1.selector.mapping.letter = c1 |
| a1.sources.r1.selector.mapping.number = c2 |
# 1.2.5 channel
情况 1: memory
| |
| a1.channels.c1.type = memory |
| a1.channels.c1.capacity = 1000 |
| a1.channels.c1.transactionCapacity = 100 |
情况 2 : flie
| a1.channels.c1.type=file |
| a1.channels.c1.checkpointDir=/opt/module/flume/checkpoint/behavior1 |
| a1.channels.c1.dataDirs=/opt/module/flume/data/behavior1/ |
| a1.channels.c1.capacity=1000000 |
| a1.channels.c1.maxFileSize=2146435071 |
| a1.channels.c1.keep-alive=6 |
# 1.2.6 sinkprocessor
情况 1:DefaultSinkProcessor -- 对应单个 Sink
不用写任何配置信息,默认值。
情况 2:FailoverSinkProcessor -- 对应的是 Sink Group
| a1.sinkgroups.g1.processor.type = failover |
| a1.sinkgroups.g1.processor.priority.k1 = 5 |
| a1.sinkgroups.g1.processor.priority.k2 = 10 |
| a1.sinkgroups.g1.processor.maxpenalty = 10000 |
情况 3:LoadBalancingSinkProcessor -- 对应的是 Sink Group
| a1.sinkgroups.g1.processor.type =load_balance |
| a1.sinkgroups.g1.processor.backoff = true |
| a1.sinkgroups.g1.processor.selector =round_robin |
# 1.2.7 sink
情况 1:avro
| |
| a1.sinks.k1.type = avro |
| a1.sinks.k1.hostname = hadoop104 |
| a1.sinks.k1.port = 4141 |
情况 2:hdfs
| a1.sinks.k1.type = hdfs |
| a1.sinks.k1.hdfs.path = hdfs: |
| |
| a1.sinks.k1.hdfs.filePrefix = upload- |
| |
| a1.sinks.k1.hdfs.round = true |
| |
| a1.sinks.k1.hdfs.roundValue = 1 |
| |
| a1.sinks.k1.hdfs.roundUnit = hour |
| |
| a1.sinks.k1.hdfs.useLocalTimeStamp = true |
| |
| a1.sinks.k1.hdfs.batchSize = 100 |
| |
| a1.sinks.k1.hdfs.fileType = DataStream |
| |
| a1.sinks.k1.hdfs.rollInterval = 60 |
| |
| a1.sinks.k1.hdfs.rollSize = 134217700 |
| |
| a1.sinks.k1.hdfs.rollCount = 0 |
情况 3:fill_roll
| |
| a1.sinks.k1.type = file_roll |
| a1.sinks.k1.sink.directory = /opt/module/flume/datas/flume3 |
情况 4:logger
| |
| a1.sinks.k1.type = logger |
情况 5:hbase --- 暂时不讨论
# 1.2.8 连接 source、channel、sink
情况 1:source、channel、sink 各一个、
| a1.sources.r1.channels = c1 |
| a1.sinks.k1.channel = c1 |
情况 2:source 一个、channel 一个、sink 多个
| |
| a1.sources.r1.channels = c1 |
| a1.sinkgroups.g1.sinks = k1 k2 |
| a1.sinks.k1.channel = c1 |
| a1.sinks.k2.channel = c1 |
情况 3:source 一个、channel 多个、sink 多个
| |
| a1.sources.r1.channels = c1 c2 |
| a1.sinks.k1.channel = c1 |
| a1.sinks.k2.channel = c2 |
# 1.2.9 端口和 ip 的区别
- source 端:监视指定端口并接收指定 ip 发送来的数据
| 端口:该端口只能是自己机器的端口 |
| ip(hostname):指能够接受来自此ip的数据 |
# 1.3 连接 flume
# 1.3.1 查看指定 ip 的通信端口
sudo netstat -ntlp | grep 端口号
# 1.3.2 关闭端口
sudo kill 端口的进程号
# 1.3.3 连接指定 ip 地址的指定端口
nc ip 端口号
# 1.3.4 启动 flume
| bin/flume-ng agent -n [agent name] -c conf -f [自定义flume配置文件] -Dflume.root.logger=INFO,console |
# 二、自定义 interceptor,source、 sink
# 2.1 自定义 intercepor
| package flume_interceptor; |
| |
| import org.apache.flume.Context; |
| import org.apache.flume.Event; |
| import org.apache.flume.interceptor.Interceptor; |
| |
| import java.util.List; |
| import java.util.Map; |
| |
| |
| * @author lianzhipeng |
| * @Description |
| * @create 2020-05-05 10:45 |
| */ |
| |
| public class MyInterceptor implements Interceptor { |
| |
| |
| * Description: 初始化方法,新建 Interceptor 时使用 |
| * |
| * @Author: lianzhipeng |
| * @Date: 2020/5/5 10:45 |
| * @return: void |
| */ |
| public void initialize() { |
| |
| } |
| |
| |
| * Description: 更改方法,对 event 进行处理 |
| * |
| * @param event 传入的数据 |
| * @Author: lianzhipeng |
| * @Date: 2020/5/5 10:47 |
| * @return: org.apache.flume.Event 返回处理好的数据 |
| */ |
| public Event intercept(Event event) { |
| |
| |
| Map<String, String> headers = event.getHeaders(); |
| |
| |
| |
| byte[] body = event.getBody(); |
| |
| |
| |
| String string = new String(body); |
| char c = string.charAt(0); |
| |
| if (c >= 'a' && c <= 'z' || c >= 'A' && c <= 'Z') { |
| headers.put("type", "char"); |
| } else { |
| headers.put("type", "not-char"); |
| } |
| |
| |
| |
| return event; |
| |
| |
| } |
| |
| public List<Event> intercept(List<Event> list) { |
| |
| for (Event event : list) { |
| intercept(event); |
| } |
| return list; |
| |
| |
| } |
| |
| public void close() { |
| |
| } |
| |
| |
| * 框架会调用 MyBulider 来创建自定义拦截器实例 |
| */ |
| public static class MyBulider implements Builder { |
| |
| |
| * Description: 创建自定义拦截器实例的方法 |
| * |
| * @Author: lianzhipeng |
| * @Date: 2020/5/5 10:54 |
| * @return: org.apache.flume.interceptor.Interceptor |
| */ |
| public Interceptor build() { |
| return new MyInterceptor(); |
| } |
| |
| |
| * Description: 读取配置信息 |
| * |
| * @param context |
| * @Author: lianzhipeng |
| * @Date: 2020/5/5 10:54 |
| * @return: void |
| */ |
| public void configure(Context context) { |
| |
| } |
| } |
| } |
# 2.2 自定义 source
| package flume_interceptor; |
| |
| import org.apache.flume.Context; |
| import org.apache.flume.Event; |
| import org.apache.flume.EventDeliveryException; |
| import org.apache.flume.PollableSource; |
| import org.apache.flume.channel.ChannelProcessor; |
| import org.apache.flume.conf.Configurable; |
| import org.apache.flume.event.SimpleEvent; |
| import org.apache.flume.source.AbstractSource; |
| |
| |
| * @author lianzhipeng |
| * @Description |
| * @create 2020-05-05 14:31 |
| */ |
| |
| public class MySource extends AbstractSource implements Configurable, PollableSource { |
| private String prefix; |
| private Long interval; |
| |
| |
| |
| * Description: 拉取事件并交给 ChannelProcessor 处理的方法 |
| * |
| * @Author: lianzhipeng |
| * @Date: 2020/5/5 14:33 |
| * @return: org.apache.flume.PollableSource.Status |
| */ |
| public Status process() throws EventDeliveryException { |
| Status status = null; |
| |
| try { |
| |
| Event e = getSomeData(); |
| |
| |
| ChannelProcessor channelProcessor = getChannelProcessor(); |
| |
| channelProcessor.processEvent(e); |
| |
| status = Status.READY; |
| } catch (Throwable t) { |
| |
| |
| status = Status.BACKOFF; |
| |
| |
| if (t instanceof Error) { |
| throw (Error)t; |
| } |
| } |
| |
| return status; |
| } |
| |
| |
| |
| * Description: 拉取数据并包装成 event 的过程 |
| * @Author: lianzhipeng |
| * @Date: 2020/5/5 14:55 |
| * @return: org.apache.flume.Event 拉取到的数据 |
| */ |
| private Event getSomeData() throws InterruptedException { |
| |
| int i = (int) (Math.random() * 1000); |
| |
| |
| String message = prefix + i ; |
| |
| Thread.sleep(interval); |
| |
| SimpleEvent event = new SimpleEvent(); |
| |
| event.setBody(message.getBytes()); |
| return event; |
| |
| } |
| |
| * Description: 如果拉取不到数据,backoff 时间的增长速度 |
| * |
| * @Author: lianzhipeng |
| * @Date: 2020/5/5 14:34 |
| * @return: long 增长量 |
| */ |
| public long getBackOffSleepIncrement() { |
| return 1000; |
| } |
| |
| |
| * Description: 最大的等待时间 |
| * |
| * @Author: lianzhipeng |
| * @Date: 2020/5/5 14:38 |
| * @return: long |
| */ |
| public long getMaxBackOffSleepInterval() { |
| return 10000; |
| } |
| |
| |
| * Description: 配置参数,来自于 configurable,可以定义我们自己定义的 source |
| * |
| * @param context 配置文件 |
| * @Author: lianzhipeng |
| * @Date: 2020/5/5 14:39 |
| * @return: void |
| */ |
| public void configure(Context context) { |
| |
| prefix = context.getString("prefff","xxxx" ); |
| interval = context.getLong("interval",500L); |
| |
| } |
| } |
# 2.3 自定义 sink
| package flume_interceptor; |
| |
| import org.apache.flume.*; |
| import org.apache.flume.conf.Configurable; |
| import org.apache.flume.sink.AbstractSink; |
| |
| import java.io.IOException; |
| |
| |
| * @author lianzhipeng |
| * @Description |
| * @create 2020-05-05 14:31 |
| */ |
| |
| public class MySiink extends AbstractSink implements Configurable { |
| |
| * Description: 改方法调用时,会从 Channel 中拉取数据并处理 |
| * |
| * @Author: lianzhipeng |
| * @Date: 2020/5/5 15:09 |
| * @return: org.apache.flume.Sink.Status 处理的状态 |
| */ |
| public Status process() throws EventDeliveryException { |
| Status status = null; |
| |
| |
| |
| Channel ch = getChannel(); |
| |
| Transaction txn = ch.getTransaction(); |
| |
| txn.begin(); |
| try { |
| |
| |
| |
| Event event; |
| |
| while ((event = ch.take()) == null) { |
| Thread.sleep(100); |
| } |
| |
| |
| |
| storeSomeData(event); |
| |
| txn.commit(); |
| status = Status.READY; |
| } catch (Throwable t) { |
| txn.rollback(); |
| |
| |
| |
| status = Status.BACKOFF; |
| |
| |
| if (t instanceof Error) { |
| throw (Error) t; |
| } |
| } finally { |
| |
| txn.close(); |
| } |
| return status; |
| } |
| |
| private void storeSomeData(Event event) throws IOException { |
| |
| |
| byte[] body = event.getBody(); |
| |
| System.out.write(body); |
| System.out.println(); |
| } |
| |
| |
| public void configure(Context context) { |
| |
| } |
| } |
# 三、kafka 与 flume 的结合
kafka:数据的中转站,主要功能由 topic 体现;
flume:数据的采集,通过 source 和 sink 体现。
# 3.1 kafka source
配置文件:
| a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource |
| a1.sources.r1.kafka.bootstrap.servers = hadoop105:9092,hadoop106:9092 |
| a1.sources.r1.kafka.topics=topic_log |
| a1.sources.r1.batchSize=6000 |
| a1.sources.r1.batchDurationMillis=2000 |
# 3.2 kakfa channel
- kakfa channel 这种情况使用的最多,此时的 flume 可以是消费者、生产者、source 和 sink 之间的缓冲区(具有高吞吐量的优势),Channel 是位于 Source 和 Sink 之间的缓冲区。
- 一共有三种情况,分别是:
| |
| kakfa channel为事件提供了可靠且高可用的通道; |
| |
| |
| it allows writing Flume events into a Kafka topic, for use by other app |
| |
| |
| it is a low-latency, fault tolerant way to send events from Kafka to Flume sinks such as HDFS, HBase or Solr |
配置文件:
| a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel |
| a1.channels.c1.kafka.bootstrap.servers = hadoop105:9092,hadoop106:9092,hadoop107:9092 |
| a1.channels.c1.kafka.topic =topic_log |
| a1.channels.c1.parseAsFlumeEvent=false |
# 3.3 kafka sink
作用:将数据拉取到 kafka 的 topic 中。
配置文件:
| a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink |
| a1.sinks.k1.kafka.topic =topic_log |
| a1.sinks.k1.kafka.bootstrap.servers = hadoop105:9092,hadoop106:9092,hadoop107:9092 |
| a1.sinks.k1.kafka.flumeBatchSize = 20 |
| a1.sinks.k1.kafka.producer.acks = 1 |
| a1.sinks.k1.kafka.producer.linger.ms = 1 |
| a1.sinks.k1.kafka.producer.compression.type = snappy |