上传并解压在/usr/local/src,并重命名
修改默认server.properties(kafka/config/),并修改/etc/profile文件,添加kafka环境变量
分发并修改权限:
修改各节点的server.properties
slave01:id=1
slave02:id=2
确保zookeeper集群正常
启动Kafka服务(集群都要)
master上打开一个新终端,创建一个topic,名为tz123
/usr/local/src/kafka/bin/kafka-topics.sh --create --zookeeper master-tz:2181,slave01-tz:2181,slave02-tz:2181 --replication-factor 2 --topic tz123 --partitions 1
//Create参数代表创建, zookeeper参数为zookeeper集群的主机名 ,replication-factor代表生成多少个副本文件,topic 为topic的名称,partitions指定多少个分区
在master创建一个生产者
/usr/local/src/kafka/bin/kafka-console-producer.sh --broker-list master-tz:9092,slave01-tz:9092,slave02-tz:9092 --topic tz123
//broker-list指定服务器,在Kafka 集群包含一个或多个服务器,这种服务器被称为 broker。Topic指定在hello上创建生产者。
在slave1创建一个消费者
/usr/local/src/kafka/bin/kafka-console-consumer.sh --zookeeper master-tz:2181,slave01-tz:2181,slave02-tz:2181 --topic tz123 --from-beginning
在生产者中输入信息,在消费者中查看信息
在slave01上就可以收到消息
在/usr/local/src/flume/conf/新建一个文件flume-syslog-kafka.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=syslogtcp
a1.sources.r1.port=6868
a1.sources.r1.host=master
a1.channels.c1.type=memory
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList=master:9092,slave1:9092,slave2:9092
a1.sinks.k1.topic=tz123 //这是你之前创建的topic主题名
a1.sinks.k1.channel=c1
a1.sources.r1.channels=c1
然后启动flume
flume-ng agent --conf /usr/local/src/flume/conf/ --name a1 --conf-file /usr/local/src/flume/conf/flume-syslog-kafka.conf
在消费者slave01下查看
]]>