发布于2021-05-30 02:19 阅读(1250) 评论(0) 点赞(3) 收藏(4)
package kb11;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @Author Xulihua
* @Date2021/5/25
* @Description
* 对source接收到的event进行分辨
* event对象:header,body
* body内容以hello开头 则给当前的event head打上hello标签
* body内容以hi开头 则给当前的event head打上hi 标签
*/
public class InterceptorDmo implements Interceptor {
ArrayList<Event> addHeaderEvents=null;
@Override
public void initialize() {
addHeaderEvents= new ArrayList<>();
}
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
//对event body 打标签
byte[] body = event.getBody();
String bodyStr = new String(body);
if (bodyStr.startsWith("hello")){
headers.put("type","hello");
}else if (bodyStr.startsWith("hi")){
headers.put("type","hi");
}else{
headers.put("type","other");
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
addHeaderEvents.clear(); //清空list
for (Event event : list) {
Event opEvent = intercept(event);
addHeaderEvents.add(opEvent);
}
return addHeaderEvents;
}
@Override
public void close() {
addHeaderEvents.clear();
addHeaderEvents=null;
}
//内部类
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new InterceptorDmo();
}
@Override
public void configure(Context context) {
}
}
}
在右边栏 MavenProject 打jar包
在左边栏target找到jar包 拖到 flume/lib 下
- vi netcat-flume-interceptor.conf
#定义
interceptordemo.sources=interceptorDemoSource
interceptordemo.channels=hellochannel hichannel otherchannel
interceptordemo.sinks=hellosink hisink othersink
#source
interceptordemo.sources.interceptorDemoSource.type=netcat
interceptordemo.sources.interceptorDemoSource.bind=localhost
interceptordemo.sources.interceptorDemoSource.port=44444
interceptordemo.sources.interceptorDemoSource.interceptors=interceptor1
interceptordemo.sources.interceptorDemoSource.interceptors.interceptor1.type=kb11.InterceptorDmo$Builder
interceptordemo.sources.interceptorDemoSource.selector.type=multiplexing
interceptordemo.sources.interceptorDemoSource.selector.mapping.hello=hellochannel
interceptordemo.sources.interceptorDemoSource.selector.mapping.hi=hichannel
interceptordemo.sources.interceptorDemoSource.selector.mapping.other=otherchannel
interceptordemo.sources.interceptorDemoSource.selector.header=type```
#channel
interceptordemo.channels.hellochannel.type=memory
interceptordemo.channels.hellochannel.capacity=1000
interceptordemo.channels.hellochannel.transactionCapacity=100
interceptordemo.channels.hichannel.type=memory
interceptordemo.channels.hichannel.capacity=1000
interceptordemo.channels.hichannel.transactionCapacity=100
interceptordemo.channels.otherchannel.type=memory
interceptordemo.channels.otherchannel.capacity=1000
interceptordemo.channels.otherchannel.transactionCapacity=100
#sink
interceptordemo.sinks.hellosink.type=hdfs
interceptordemo.sinks.hellosink.hdfs.fileType=DataStream
interceptordemo.sinks.hellosink.hdfs.filePrefix=hello
interceptordemo.sinks.hellosink.hdfs.fileSuffix=.csv
interceptordemo.sinks.hellosink.hdfs.path=hdfs://192.168.107.103:9000/kb11file/hello/%Y-%m-%d
interceptordemo.sinks.hellosink.hdfs.useLocalTimeStamp=true
interceptordemo.sinks.hellosink.hdfs.batchSize=640
interceptordemo.sinks.hellosink.hdfs.rollCount=0
interceptordemo.sinks.hellosink.hdfs.rollSize=6400000
interceptordemo.sinks.hellosink.hdfs.rollInterval=3
interceptordemo.sinks.hisink.type=org.apache.flume.sink.kafka.KafkaSink
interceptordemo.sinks.hisink.batchSize=640
interceptordemo.sinks.hisink.brokerList=192.168.107.103:9092
interceptordemo.sinks.hisink.topic=hi
interceptordemo.sinks.othersink.type=logger
#source连接 channel sink连接channel
interceptordemo.sources.interceptorDemoSource.channels=hellochannel hichannel otherchannel
interceptordemo.sinks.hellosink.channel=hellochannel
interceptordemo.sinks.hisink.channel=hichannel
interceptordemo.sinks.othersink.channel=otherchannel
kafka-topics.sh --zookeeper 192.168.107.103:2181 --create --topic hi --partitions 1 --replication-factor 1
kafka-console-consumer.sh --topic hi --bootstrap-server 192.168.107.103:9092 hi flume
./bin/flume-ng agent --name interceptordemo --conf ./conf/ --conf-file ./conf/kb11job/netcat-flume-interceptor.conf -Dflume.root.logger=INFO,console
telnet localhost 44444
logger输出
Kafka输出
Hdfs存储
作者:我是个大美女
链接:http://www.javaheidong.com/blog/article/208057/4cce1684a18df98d7dcc/
来源:java黑洞网
任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任
昵称:
评论内容:(最多支持255个字符)
---无人问津也好,技不如人也罢,你都要试着安静下来,去做自己该做的事,而不是让内心的烦躁、焦虑,坏掉你本来就不多的热情和定力
Copyright © 2018-2021 java黑洞网 All Rights Reserved 版权所有,并保留所有权利。京ICP备18063182号-2
投诉与举报,广告合作请联系vgs_info@163.com或QQ3083709327
免责声明:网站文章均由用户上传,仅供读者学习交流使用,禁止用做商业用途。若文章涉及色情,反动,侵权等违法信息,请向我们举报,一经核实我们会立即删除!