Flume对接kafka

Flume对接kafka1.简单的对接需求:使用收集控制台的信息存储再kafka并用kafka消费者消费Flume配置文件:#Namethecomponentsonthisagenta1.sources=r1a1.sinks=k1a1.channels=c1#Describe/configurethesourcea1.sources.r1.type=netcata1.sources.r1.bind=0.0.0.0a1.sources.r1.port=444

大家好,欢迎来到IT知识分享网。

1.简单的对接

需求:

使用收集控制台的信息存储再kafka并用kafka消费者消费

Flume配置文件:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

# Describe the sink
#核心配置kafkaSink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#选择kafka主题,默认会在Defale_kafka主题中
a1.sinks.k1.kafka.topic = first
#要连接的kafka集群
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
#因为flume相当于kafka生产者,所以要配置类似参数
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

实验:

开启消费者

Flume对接kafka

开启flume

Flume对接kafka

连接端口并发送数据Flume对接kafka

观察可以收到

Flume对接kafka

2.数据分类案例

需求:使用flume收集数据并分类,将数字开头数据发到一个kafka话题,字母开头的数据发送到另一个话题

Flume配置文件:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

# Describe the sink
#核心配置kafkaSink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#选择kafka主题,默认会在Defale_kafka主题中
#flume中对KafkaSink规定了,如果event头信息中携带了topic属性,会覆盖此属性
#且会根据topic键的value确定数据会发送的分区
a1.sinks.k1.kafka.topic = first
#要连接的kafka集群
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
#因为flume相当于kafka生产者,所以要配置类似参数
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

#连接sources连接interceptors(i1)
a1.sources.r1.interceptors = i1
#下面写内部类的全类名,因为是子类所以要把.换成$
a1.sources.r1.interceptors.i1.type = com.aran.flume.MyInterceptor$Builder


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 拦截器代码:

public class MyInterceptor implements Interceptor {
    //初始化方法,flume在用interceptor之前
    public void initialize() {

    }

    //核心方法,处理单个event,返回null则丢失这个event
    public Event intercept(Event event) {
        //三种情况,字母开头,数字开头,不是字母和数字开头
        //获取
        byte[] body = event.getBody();
        Map<String, String> headers = event.getHeaders();
        //将body编译成String
        String line = new String(body, StandardCharsets.UTF_8);
        //获取第一个字母
        char c = line.charAt(0);

        if ((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z')) {
            //字母开头,在头部添加内容
            headers.put("topic", "first");

        } else if (c >= '0' && c <= '9') {
            //数字开头,在头部添加内容
            headers.put("topic", "second");
        } else {
            //啥也不是
            return null;
        }

        return event;
    }

    //核心方法,处理单个event,输出的长度一定不能比输入的长度大
    //不能返回null,如果所有的都不想要就返回空的list
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);

        }
        return list;
    }

    //关闭流
    public void close() {

    }

    //static的原因是因为是子类,要让flume先找到就要用静态先加载
    public static class Builder implements Interceptor.Builder {
        public Interceptor build() {
            return new MyInterceptor();
        }
        //配置方法,可以从配置文件中配置我们的Interceptor,也就是配置文件
        public void configure(Context context) {

        }
    }
}

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/14288.html

(0)
上一篇 2024-02-25 07:45
下一篇 2024-03-10 13:45

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

关注微信