Flume各种source、channel和sink配置

source:


    1、序列(seq)源:多用作测试
        # 将agent组件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = seq
        # 总共发送的事件个数
        a1.sources.r1.totalEvents = 1000    

        # 配置sink
        a1.sinks.k1.type = logger

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 绑定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

    2、压力(stress)源:多用作负载测试
        # 将agent组件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = org.apache.flume.source.StressSource
        # 单个事件大小,单位:byte
        a1.sources.r1.size = 10240
        # 事件总数
        a1.sources.r1.maxTotalEvents = 1000000

        # 配置sink
        a1.sinks.k1.type = logger

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 绑定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

    3、滚动目录(Spooldir)源:监听指定目录新文件产生,并将新文件数据作为event发送
        # 将agent组件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = spooldir
        # 设置监听目录
        a1.sources.r1.spoolDir = /home/centos/spooldir

        # 通过以下配置指定消费完成后文件后缀
        #a1.sources.r1.fileSuffix = .COMPLETED 

        # 配置sink
        a1.sinks.k1.type = logger

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 绑定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

    4、exec源    //通过执行linux命令产生新数据
            //典型应用 tail -F (监听一个文件,文件增长的时候,输出追加数据)
            //不能保证数据完整性,很可能丢失数据

        # 将agent组件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = exec
        # 配置linux命令
        a1.sources.r1.command = tail -F /home/centos/readme.txt

        # 配置sink
        a1.sinks.k1.type = logger

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 绑定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

    5、Taildir源        //监控目录下文件
                //文件类型可通过正则指定
                //有容灾机制

        # 将agent组件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source类型
        a1.sources.r1.type = TAILDIR
        # 设置source组 可设置多个
        a1.sources.r1.filegroups = f1
        # 设置组员的监控目录和监控文件类型,使用正则表示,只能监控文件
        a1.sources.r1.filegroups.f1 = /home/centos/taildir/.*

        # 设置定位(元数据---采集的位置)文件的位置
        # a1.sources.r1.positionFile     ~/.flume/taildir_position.json

        # 配置sink输出
        a1.sinks.k1.type = logger

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 绑定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

 

sink:



    1、fileSink    //多用作数据收集
        # 将agent组件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888

        # 配置sink
        a1.sinks.k1.type = file_roll
        # 配置目标文件夹
        a1.sinks.k1.sink.directory = /home/centos/file
        # 设置滚动间隔,默认30s,设为0则不滚动,成为单个文件
        a1.sinks.k1.sink.rollInterval = 0

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 绑定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

    2、hdfsSink        //默认以seqFile格式写入
                //k:LongWritable
                //v: BytesWritable
        # 将agent组件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888

        # 配置sink
        a1.sinks.k1.type = hdfs
        # 配置目标文件夹
        a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/
        # 配置文件前缀
        a1.sinks.k1.hdfs.filePrefix = events-
        # 滚动间隔,秒
        a1.sinks.k1.hdfs.rollInterval = 0
        # 触发滚动文件大小,byte
        a1.sinks.k1.hdfs.rollSize = 1024
        # 配置使用本地时间戳
        a1.sinks.k1.hdfs.useLocalTimeStamp = true
        # 配置输出文件类型,默认SequenceFile
        # DataStream文本格式,不能设置压缩编解码器
        # CompressedStream压缩文本格式,需要设置编解码器
        a1.sinks.k1.hdfs.fileType = DataStream

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 绑定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

    3、hiveSink:        //hiveserver帮助:hive --service help
                //1、hive --service metastore 启动hive的metastore服务,metastore地址:thrift://localhost:9083
                //2、将hcatalog的依赖放在/hive/lib下,cp hive-hcatalog* /soft/hive/lib    (位置/soft/hive/hcatalog/share/hcatalog)
                //3、创建hive事务表
                //SET hive.support.concurrency=true;                                 
                  SET hive.enforce.bucketing=true;                                   
                  SET hive.exec.dynamic.partition.mode=nonstrict;                    
                  SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
                  SET hive.compactor.initiator.on=true;                              
                  SET hive.compactor.worker.threads=1;

                //create table myhive.weblogs(id int, name string, age int)
                  clustered by(id) into 2 buckets                                        
                  row format delimited                                                         
                  fields terminated by '\t'                                                    
                  stored as orc                                                                
                  tblproperties('transactional'='true');                                        

        # 将agent组件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888

        # 配置sink
        a1.sinks.k1.type = hive
        a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
        a1.sinks.k1.hive.database = myhive
        a1.sinks.k1.hive.table = weblogs
        a1.sinks.k1.useLocalTimeStamp = true
        #输入格式,DELIMITED和json
        #DELIMITED    普通文本
        #json        json文件
        a1.sinks.k1.serializer = DELIMITED
        #输入字段分隔符,双引号
        a1.sinks.k1.serializer.delimiter = ","
        #输出字段分隔符,单引号
        a1.sinks.k1.serializer.serdeSeparator = '\t'
        #字段名称,","分隔,不能有空格
        a1.sinks.k1.serializer.fieldnames =id,name,age

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 绑定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

    4、hbaseSink            //SimpleHbaseEventSerializer将rowKey和col设置了默认值,不能自定义
                    //RegexHbaseEventSerializer可以手动指定rowKey和col字段名称

        # 将agent组件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888

        # 配置sink
        a1.sinks.k1.type = hbase
        a1.sinks.k1.table = flume_hbase
        a1.sinks.k1.columnFamily = f1
        a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

        # 配置col正则手动指定
        # rowKeyIndex手动指定rowKey,索引以0开头
        a1.sinks.k1.serializer.colNames = ROW_KEY,name,age
        a1.sinks.k1.serializer.regex = (.*),(.*),(.*)
        a1.sinks.k1.serializer.rowKeyIndex=0

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 绑定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

    5、asynchbaseSink        //异步hbaseSink
                    //异步机制,写入速度快
        # 将agent组件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888

        # 配置sink
        a1.sinks.k1.type = asynchbase
        a1.sinks.k1.table = flume_hbase
        a1.sinks.k1.columnFamily = f1
        a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 绑定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1
       

channel:缓冲区


    1、memorychannel
        a1.channels.c1.type = memory
        # 缓冲区中存留的最大event个数
        a1.channels.c1.capacity = 1000
        # channel从source中每个事务提取的最大event数
        # channel发送给sink每个事务发送的最大event数
        a1.channels.c1.transactionCapacity = 100

    2、fileChannel:    //检查点和数据存储在默认位置时,当多个channel同时开启
                //会导致文件冲突,引发其他channel会崩溃

        # 将agent组件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888

        # 配置sink
        a1.sinks.k1.type = logger

        # 配置channel
        a1.channels = c1
        a1.channels.c1.type = file
        a1.channels.c1.checkpointDir = /home/centos/flume/checkpoint
        a1.channels.c1.dataDirs = /home/centos/flume/data

        # 绑定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

    memoryChannel:快速,但是当设备断电,数据会丢失

    FileChannel:  速度较慢,即使设备断电,数据也不会丢失

 

channel selector:通道挑选器


    是source端组件:负责将event发送到指定的channel,相当于分区

    当一个source设置多个channel时,默认以副本形式向每个channel发送一个event拷贝

    1、replication副本通道挑选器    //默认挑选器,source将所有channel发送event副本
                    //设置source x 1, channel x 3, sink x 3
                    //    nc       memory    file

        # 将agent组件起名
        a1.sources = r1
        a1.sinks = k1 k2 k3
        a1.channels = c1 c2 c3

        # 配置source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888
        a1.sources.r1.selector.type = replicating

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        a1.channels.c2.type = memory
        a1.channels.c2.capacity = 1000
        a1.channels.c2.transactionCapacity = 100

        a1.channels.c3.type = memory
        a1.channels.c3.capacity = 1000
        a1.channels.c3.transactionCapacity = 100

        # 配置sink
        a1.sinks.k1.type = file_roll
        a1.sinks.k1.sink.directory = /home/centos/file1
        a1.sinks.k1.sink.rollInterval = 0

        a1.sinks.k2.type = file_roll
        a1.sinks.k2.sink.directory = /home/centos/file2
        a1.sinks.k2.sink.rollInterval = 0

        a1.sinks.k3.type = file_roll
        a1.sinks.k3.sink.directory = /home/centos/file3
        a1.sinks.k3.sink.rollInterval = 0

        # 绑定channel-source, channel-sink
        a1.sources.r1.channels = c1 c2 c3
        a1.sinks.k1.channel = c1
        a1.sinks.k2.channel = c2
        a1.sinks.k3.channel = c3

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐