spring cloud stream 提供了消息队列的封装。最近公司对新同事进行了相关的培训。
这里顺便记录一下主要部分.
学习路径
首先,需要对spring boot 有一定的了解,并能够独立搭建spring boot 项目。
其次需要对maven 有所了解。这些为基础工作。
准备工作
首先需要了解kafka 基本原理。
参考网站:
http://kafka.apache.org/intro
https://www.jianshu.com/p/97011dab6c56
然后了解 spring cloud stream 的基本原理:
https://blog.csdn.net/yejingtao703/article/details/78059480
开发准备
kafka 准备
当然首先下载安装kafka.
下载地址: Kafka Download
这里默认在linux 环境下。
1. 解压
> tar -xzf kafka_2.12-2.2.0.tgz –版本号随着时间可能变化哦
> cd kafka_2.12-2.2.0
2. 启动zookeeper
> bin/zookeeper-server-start.sh config/zookeeper.properties
3. 启动kafka
> bin/kafka-server-start.sh config/server.properties
windows 用户自行换成 windows/*.bat 进行操作。
spring boot 准备
需要参考spring boot 的快速搭建。
地址如下:https://start.spring.io
开始编写
在spring boot 的基础之上,需要参考spring cloud stream 项目的教程。
地址: https://cloud.spring.io/spring-cloud-stream-binder-kafka/
demo 地址:https://github.com/spring-cloud/spring-cloud-stream-samples
根据上面的例子,我这里整理了一下。以最简单的例子进行演示,当然如果你有更哈
maven 依赖添加
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency><dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-dependencies</artifactId> <version>Fishtown.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
编写java类:
package com.cqmaple.mq.example.kafka.demo; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.context.annotation.Bean; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.Poller; import org.springframework.integration.core.MessageSource; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.support.GenericMessage; import org.springframework.stereotype.Component; import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; public class SampleRunner { //Following code is only used as a test harness. //Following source is used as test producer. @EnableBinding(TestSource.class) static class TestProducer { private AtomicBoolean semaphore = new AtomicBoolean(true); private String[] randomWords = new String[]{"foo", "bar", "foobar", "baz", "fox"}; private Random random = new Random(); @Bean @InboundChannelAdapter(channel = TestSource.OUTPUT, poller = @Poller(fixedDelay = "1000")) public MessageSource<String> sendTestData() { return () -> { int idx = random.nextInt(5); return new GenericMessage<>(randomWords[idx]); }; } } //Following sink is used as test consumer for the above processor. It logs the data received through the processor. @EnableBinding(TestSink.class) static class TestConsumer { private final Log logger = LogFactory.getLog(getClass()); @StreamListener(TestSink.INPUT) public void receive(String data) { logger.info("Data received..." + data); } } interface TestSink { String INPUT = "input1"; @Input(INPUT) SubscribableChannel input1(); } interface TestSource { String OUTPUT = "output1"; @Output(TestSource.OUTPUT) MessageChannel output(); } }
配置文件:
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000 spring.cloud.stream.kafka.streams: binder.configuration: default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde spring.cloud.stream.bindings.input1.destination: words spring.cloud.stream.bindings.output1.destination: words
启动类:
package com.cqmaple.mq.example.kafka.demo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class KafkaDemoApplication { public static void main(String[] args) { SpringApplication.run(KafkaDemoApplication.class, args); } }
运行结果:
2019-06-26 15:19:38.866 INFO 8408 — [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=anonymous.832febbd-3079-46f0-b3f8-59c707eaedbe] Successfully joined group with generation 1
2019-06-26 15:19:38.868 INFO 8408 — [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=anonymous.832febbd-3079-46f0-b3f8-59c707eaedbe] Setting newly assigned partitions [words-0]
2019-06-26 15:19:38.878 INFO 8408 — [container-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-3, groupId=anonymous.832febbd-3079-46f0-b3f8-59c707eaedbe] Resetting offset for partition words-0 to offset 888.
2019-06-26 15:19:38.882 INFO 8408 — [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : partitions assigned: [words-0]
2019-06-26 15:19:39.733 INFO 8408 — [container-0-C-1] onsumer$$EnhancerBySpringCGLIB$$d1631839 : Data received…baz
2019-06-26 15:19:40.683 INFO 8408 — [container-0-C-1] onsumer$$EnhancerBySpringCGLIB$$d1631839 : Data received…baz
[…] 可以参照 spring cloud stream Kafka 示例 […]