大家好,欢迎来到IT知识分享网。
1.简单的对接
需求:
使用收集控制台的信息存储再kafka并用kafka消费者消费
Flume配置文件:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
# Describe the sink
#核心配置kafkaSink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#选择kafka主题,默认会在Defale_kafka主题中
a1.sinks.k1.kafka.topic = first
#要连接的kafka集群
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
#因为flume相当于kafka生产者,所以要配置类似参数
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
实验:
开启消费者
开启flume
连接端口并发送数据
观察可以收到
2.数据分类案例
需求:使用flume收集数据并分类,将数字开头数据发到一个kafka话题,字母开头的数据发送到另一个话题
Flume配置文件:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
# Describe the sink
#核心配置kafkaSink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#选择kafka主题,默认会在Defale_kafka主题中
#flume中对KafkaSink规定了,如果event头信息中携带了topic属性,会覆盖此属性
#且会根据topic键的value确定数据会发送的分区
a1.sinks.k1.kafka.topic = first
#要连接的kafka集群
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
#因为flume相当于kafka生产者,所以要配置类似参数
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
#连接sources连接interceptors(i1)
a1.sources.r1.interceptors = i1
#下面写内部类的全类名,因为是子类所以要把.换成$
a1.sources.r1.interceptors.i1.type = com.aran.flume.MyInterceptor$Builder
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
拦截器代码:
public class MyInterceptor implements Interceptor {
//初始化方法,flume在用interceptor之前
public void initialize() {
}
//核心方法,处理单个event,返回null则丢失这个event
public Event intercept(Event event) {
//三种情况,字母开头,数字开头,不是字母和数字开头
//获取
byte[] body = event.getBody();
Map<String, String> headers = event.getHeaders();
//将body编译成String
String line = new String(body, StandardCharsets.UTF_8);
//获取第一个字母
char c = line.charAt(0);
if ((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z')) {
//字母开头,在头部添加内容
headers.put("topic", "first");
} else if (c >= '0' && c <= '9') {
//数字开头,在头部添加内容
headers.put("topic", "second");
} else {
//啥也不是
return null;
}
return event;
}
//核心方法,处理单个event,输出的长度一定不能比输入的长度大
//不能返回null,如果所有的都不想要就返回空的list
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}
//关闭流
public void close() {
}
//static的原因是因为是子类,要让flume先找到就要用静态先加载
public static class Builder implements Interceptor.Builder {
public Interceptor build() {
return new MyInterceptor();
}
//配置方法,可以从配置文件中配置我们的Interceptor,也就是配置文件
public void configure(Context context) {
}
}
}
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/14288.html