程序员最近都爱上了这个网站  程序员们快来瞅瞅吧!  it98k网:it98k.com

本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长


+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

flume读取netcat数据 同时输出logger hdfs kafka

发布于2021-05-30 02:19     阅读(1128)     评论(0)     点赞(3)     收藏(4)


  • java代码

package kb11;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @Author Xulihua
 * @Date2021/5/25
 * @Description
 * 对source接收到的event进行分辨
 * event对象:header,body
 * body内容以hello开头 则给当前的event head打上hello标签
 * body内容以hi开头 则给当前的event head打上hi 标签
 */
public class InterceptorDmo implements Interceptor {
    ArrayList<Event> addHeaderEvents=null;
    @Override
    public void initialize() {
        addHeaderEvents= new ArrayList<>();
    }

    @Override
    public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();
        //对event body 打标签
        byte[] body = event.getBody();
        String bodyStr = new String(body);
        if (bodyStr.startsWith("hello")){
            headers.put("type","hello");
        }else if (bodyStr.startsWith("hi")){
            headers.put("type","hi");
        }else{
            headers.put("type","other");
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        addHeaderEvents.clear(); //清空list
        for (Event event : list) {
            Event opEvent = intercept(event);
            addHeaderEvents.add(opEvent);
        }
        return addHeaderEvents;
    }

    @Override
    public void close() {
        addHeaderEvents.clear();
        addHeaderEvents=null;
    }

    //内部类
    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new InterceptorDmo();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

  • 打瘦包就行

在右边栏 MavenProject 打jar包

在这里插入图片描述

在左边栏target找到jar包 拖到 flume/lib 下
在这里插入图片描述

- 配置flume下的新建的conf

- vi netcat-flume-interceptor.conf

#定义
interceptordemo.sources=interceptorDemoSource
interceptordemo.channels=hellochannel hichannel otherchannel
interceptordemo.sinks=hellosink hisink othersink
#source
interceptordemo.sources.interceptorDemoSource.type=netcat
interceptordemo.sources.interceptorDemoSource.bind=localhost
interceptordemo.sources.interceptorDemoSource.port=44444
interceptordemo.sources.interceptorDemoSource.interceptors=interceptor1
interceptordemo.sources.interceptorDemoSource.interceptors.interceptor1.type=kb11.InterceptorDmo$Builder
interceptordemo.sources.interceptorDemoSource.selector.type=multiplexing
interceptordemo.sources.interceptorDemoSource.selector.mapping.hello=hellochannel
interceptordemo.sources.interceptorDemoSource.selector.mapping.hi=hichannel
interceptordemo.sources.interceptorDemoSource.selector.mapping.other=otherchannel
interceptordemo.sources.interceptorDemoSource.selector.header=type```
#channel
interceptordemo.channels.hellochannel.type=memory
interceptordemo.channels.hellochannel.capacity=1000
interceptordemo.channels.hellochannel.transactionCapacity=100

interceptordemo.channels.hichannel.type=memory
interceptordemo.channels.hichannel.capacity=1000
interceptordemo.channels.hichannel.transactionCapacity=100

interceptordemo.channels.otherchannel.type=memory
interceptordemo.channels.otherchannel.capacity=1000
interceptordemo.channels.otherchannel.transactionCapacity=100
#sink
interceptordemo.sinks.hellosink.type=hdfs
interceptordemo.sinks.hellosink.hdfs.fileType=DataStream
interceptordemo.sinks.hellosink.hdfs.filePrefix=hello
interceptordemo.sinks.hellosink.hdfs.fileSuffix=.csv
interceptordemo.sinks.hellosink.hdfs.path=hdfs://192.168.107.103:9000/kb11file/hello/%Y-%m-%d
interceptordemo.sinks.hellosink.hdfs.useLocalTimeStamp=true
interceptordemo.sinks.hellosink.hdfs.batchSize=640
interceptordemo.sinks.hellosink.hdfs.rollCount=0
interceptordemo.sinks.hellosink.hdfs.rollSize=6400000
interceptordemo.sinks.hellosink.hdfs.rollInterval=3

interceptordemo.sinks.hisink.type=org.apache.flume.sink.kafka.KafkaSink
interceptordemo.sinks.hisink.batchSize=640
interceptordemo.sinks.hisink.brokerList=192.168.107.103:9092
interceptordemo.sinks.hisink.topic=hi

interceptordemo.sinks.othersink.type=logger

#source连接 channel   sink连接channel
interceptordemo.sources.interceptorDemoSource.channels=hellochannel hichannel otherchannel
interceptordemo.sinks.hellosink.channel=hellochannel
interceptordemo.sinks.hisink.channel=hichannel
interceptordemo.sinks.othersink.channel=otherchannel

- 启动的项目顺序:

  1. Kafka 创建topic hi 并且启动 消费者consumer 等待消费消息

kafka-topics.sh --zookeeper 192.168.107.103:2181 --create --topic hi --partitions 1 --replication-factor 1

kafka-console-consumer.sh --topic hi --bootstrap-server 192.168.107.103:9092 hi flume

  1. 启动flume-ng

./bin/flume-ng agent --name interceptordemo --conf ./conf/ --conf-file ./conf/kb11job/netcat-flume-interceptor.conf -Dflume.root.logger=INFO,console

  1. source为netcat输入

telnet localhost 44444
在这里插入图片描述
logger输出
在这里插入图片描述
Kafka输出
在这里插入图片描述
Hdfs存储
在这里插入图片描述



所属网站分类: 技术文章 > 博客

作者:我是个大美女

链接:http://www.javaheidong.com/blog/article/208057/4cce1684a18df98d7dcc/

来源:java黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

3 0
收藏该文
已收藏

评论内容:(最多支持255个字符)