flume与kafka集成

1.kafkaSink

		[生产者]
		a1.sources = r1
		a1.sinks = k1
		a1.channels = c1

		a1.sources.r1.type=netcat
		a1.sources.r1.bind=localhost
		a1.sources.r1.port=8888

		a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
		a1.sinks.k1.kafka.topic = test3
		a1.sinks.k1.kafka.bootstrap.servers = s202:9092
		a1.sinks.k1.kafka.flumeBatchSize = 20
		a1.sinks.k1.kafka.producer.acks = 1

		a1.channels.c1.type=memory

		a1.sources.r1.channels = c1
		a1.sinks.k1.channel = c1

2.KafkaSource

		[消费者]
		a1.sources = r1
		a1.sinks = k1
		a1.channels = c1

		a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
		a1.sources.r1.batchSize = 5000
		a1.sources.r1.batchDurationMillis = 2000
		a1.sources.r1.kafka.bootstrap.servers = s202:9092
		a1.sources.r1.kafka.topics = test3
		a1.sources.r1.kafka.consumer.group.id = g4

		a1.sinks.k1.type = logger

		a1.channels.c1.type=memory

		a1.sources.r1.channels = c1
		a1.sinks.k1.channel = c1

3.Channel

		生产者 + 消费者
		a1.sources = r1
		a1.sinks = k1
		a1.channels = c1

		a1.sources.r1.type = avro
		a1.sources.r1.bind = localhost
		a1.sources.r1.port = 8888

		a1.sinks.k1.type = logger

		a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
		a1.channels.c1.kafka.bootstrap.servers = s202:9092
		a1.channels.c1.kafka.topic = test3
		a1.channels.c1.kafka.consumer.group.id = g6

		a1.sources.r1.channels = c1
		a1.sinks.k1.channel = c1


发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

微信扫一扫

微信扫一扫

微信扫一扫,分享到朋友圈

flume与kafka集成