代码-第5章 日志采集系统Flume-林子雨编著《数据采集与预处理》

大数据学习路线图

林子雨编著《数据采集与预处理》教材配套代码(教材官网
查看所有章节代码

第5章 日志采集系统Flume

> cd c:\apache-flume-1.9.0-bin\bin
> flume-ng version

example.conf

# 设置Agent上的各个组件名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 配置Sink
a1.sinks.k1.type = logger

# 配置Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 把Source和Sink绑定到Channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
> cd c:\apache-flume-1.9.0-bin
> .\bin\flume-ng agent --conf .\conf --conf-file .\conf\example.conf --name a1 -property flume.root.logger=INFO,console
> telnet localhost 44444

example1.conf

#定义三大组件名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#定义Source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = C:/mylogs/

#定义Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#定义Sink
a1.sinks.k1.type = logger

#组装Source、Channel、Sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
> cd c:\apache-flume-1.9.0-bin
> .\bin\flume-ng agent --conf .\conf --conf-file .\conf\example1.conf --name a1 -property flume.root.logger=INFO,console
> cd c:\kafka_2.12-2.4.0
> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.Properties
> cd c:\kafka_2.12-2.4.0
> .\bin\windows\kafka-server-start.bat .\config\server.properties
> cd c:\kafka_2.12-2.4.0
> .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

kafka.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
> cd c:\apache-flume-1.9.0-bin
> .\bin\flume-ng.cmd agent --conf ./conf --conf-file ./conf/kafka.conf --name a1 -property flume.root.logger=INFO,console
> telnet localhost 44444
> cd c:\kafka_2.12-2.4.0
> .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

spooldir_hdfs.conf

#定义三大组件的名称
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# 配置Source组件
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = C:/mylogs/
agent1.sources.source1.fileHeader = false

# 配置Sink组件
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path =hdfs://localhost:9000/weblog/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

# 设置Channel
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

# 把Source和Sink绑定到Channel上
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
> cd c:\hadoop-3.1.3\sbin
> start-dfs.cmd
> jps
> cd c:\apache-flume-1.9.0-bin
> .\bin\flume-ng agent --conf .\conf --conf-file .\conf\spooldir_hdfs.conf --name agent1 -property flume.root.logger=INFO,console

exec_hdfs.conf

#定义三大组件的名称
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# 配置Source组件
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F C:/mylogs/log1.txt
agent1.sources.source1.channels = channel1

# 配置Sink组件
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path =hdfs://localhost:9000/weblog/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

# 配置Channel组件
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

# 把Source和Sink绑定到Channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
> tail -f c:\mylogs\log1.txt
> cd c:\apache-flume-1.9.0-bin
> .\bin\flume-ng agent --conf .\conf --conf-file .\conf\exec_hdfs.conf --name agent1 -property flume.root.logger=INFO,console
https://maven.apache.org/download.cgi
https://github.com/keedio/flume-ng-sql-source/tree/release/1.5.2
> cd C:\flume-ng-sql-source-release-1.5.2
> C:\apache-maven-3.6.3\bin\mvn package
mysql>CREATE DATABASE school;
mysql> USE school;
mysql> CREATE TABLE student(
    -> id INT NOT NULL,
    -> name VARCHAR(40),
    -> age INT,
    -> grade INT,
    -> PRIMARY KEY (id));
> cd c:\hadoop-3.1.3\sbin
> start-dfs.cmd
> jps

mysql_hdfs.conf

#设置三大组件
agent1.channels = ch1
agent1.sinks = HDFS
agent1.sources = sql-source

#设置Source组件
agent1.sources.sql-source.type = org.keedio.flume.source.SQLSource
agent1.sources.sql-source.hibernate.connection.url = jdbc:mysql://localhost:3306/school
agent1.sources.sql-source.hibernate.connection.user = root
agent1.sources.sql-source.hibernate.connection.password = 123456
agent1.sources.sql-source.hibernate.connection.autocommit = true
agent1.sources.sql-source.table = student
agent1.sources.sql-source.run.query.delay=5000
agent1.sources.sql-source.status.file.path = C:/apache-flume-1.9.0-bin/
agent1.sources.sql-source.status.file.name = sql-source.status

#设置Sink组件
agent1.sinks.HDFS.type = hdfs
agent1.sinks.HDFS.hdfs.path = hdfs://localhost:9000/flume/mysql
agent1.sinks.HDFS.hdfs.fileType = DataStream
agent1.sinks.HDFS.hdfs.writeFormat = Text
agent1.sinks.HDFS.hdfs.rollSize = 268435456
agent1.sinks.HDFS.hdfs.rollInterval = 0
agent1.sinks.HDFS.hdfs.rollCount = 0

#设置Channel
agent1.channels.ch1.type = memory

#把Source和Sink绑定到Channel
agent1.sinks.HDFS.channel = ch1
agent1.sources.sql-source.channels = ch1
> cd c:\apache-flume-1.9.0-bin
> .\bin\flume-ng agent --conf .\conf --conf-file .\conf\mysql_hdfs.conf --name agent1 -property flume.root.logger=INFO,console
mysql> insert into student(id,name,age,grade) values(1,'Xiaoming',23,98)
mysql> insert into student(id,name,age,grade) values(2,'Zhangsan',24,96);
mysql> insert into student(id,name,age,grade) values(3,'Lisi',24,93);