Skip to content

spring cloud stream kafka 示例

spring cloud stream 提供了消息队列的封装。最近公司对新同事进行了相关的培训。

这里顺便记录一下主要部分.

学习路径

首先,需要对spring boot 有一定的了解,并能够独立搭建spring boot 项目。

其次需要对maven 有所了解。这些为基础工作。

准备工作

首先需要了解kafka 基本原理。

参考网站:

http://kafka.apache.org/intro

https://www.jianshu.com/p/97011dab6c56

然后了解 spring cloud stream 的基本原理:

https://blog.csdn.net/yejingtao703/article/details/78059480

开发准备

kafka 准备

当然首先下载安装kafka.

下载地址: Kafka Download

这里默认在linux 环境下。

1. 解压

> tar -xzf kafka_2.12-2.2.0.tgz –版本号随着时间可能变化哦
> cd kafka_2.12-2.2.0

2. 启动zookeeper

> bin/zookeeper-server-start.sh config/zookeeper.properties

3. 启动kafka

> bin/kafka-server-start.sh config/server.properties

windows 用户自行换成 windows/*.bat 进行操作。

spring  boot 准备

需要参考spring boot 的快速搭建。

地址如下:https://start.spring.io

开始编写

在spring boot 的基础之上,需要参考spring cloud stream 项目的教程。

地址: https://cloud.spring.io/spring-cloud-stream-binder-kafka/

demo 地址:https://github.com/spring-cloud/spring-cloud-stream-samples

根据上面的例子,我这里整理了一下。以最简单的例子进行演示,当然如果你有更哈

maven 依赖添加

 

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependencyManagement>
   <dependencies>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-stream-dependencies</artifactId>
         <version>Fishtown.RELEASE</version>
         <type>pom</type>
         <scope>import</scope>
      </dependency>
   </dependencies>
</dependencyManagement>

编写java类:

package com.cqmaple.mq.example.kafka.demo;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Component;

import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;

public class SampleRunner  {


    //Following code is only used as a test harness.

    //Following source is used as test producer.
    @EnableBinding(TestSource.class)
    static class TestProducer {

        private AtomicBoolean semaphore = new AtomicBoolean(true);

        private String[] randomWords = new String[]{"foo", "bar", "foobar", "baz", "fox"};
        private Random random = new Random();

        @Bean
        @InboundChannelAdapter(channel = TestSource.OUTPUT, poller = @Poller(fixedDelay = "1000"))
        public MessageSource<String> sendTestData() {
            return () -> {
                int idx = random.nextInt(5);
                return new GenericMessage<>(randomWords[idx]);
            };
        }
    }

    //Following sink is used as test consumer for the above processor. It logs the data received through the processor.
    @EnableBinding(TestSink.class)
    static class TestConsumer {

        private final Log logger = LogFactory.getLog(getClass());

        @StreamListener(TestSink.INPUT)
        public void receive(String data) {
            logger.info("Data received..." + data);
        }
    }

    interface TestSink {

        String INPUT = "input1";

        @Input(INPUT)
        SubscribableChannel input1();

    }

    interface TestSource {

        String OUTPUT = "output1";

        @Output(TestSource.OUTPUT)
        MessageChannel output();

    }
}

配置文件:

spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams:
  binder.configuration:
    default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

spring.cloud.stream.bindings.input1.destination: words
spring.cloud.stream.bindings.output1.destination: words

启动类:

package com.cqmaple.mq.example.kafka.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaDemoApplication {

   public static void main(String[] args) {
      SpringApplication.run(KafkaDemoApplication.class, args);
   }


}

运行结果:

2019-06-26 15:19:38.866 INFO 8408 — [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=anonymous.832febbd-3079-46f0-b3f8-59c707eaedbe] Successfully joined group with generation 1
2019-06-26 15:19:38.868 INFO 8408 — [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=anonymous.832febbd-3079-46f0-b3f8-59c707eaedbe] Setting newly assigned partitions [words-0]
2019-06-26 15:19:38.878 INFO 8408 — [container-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-3, groupId=anonymous.832febbd-3079-46f0-b3f8-59c707eaedbe] Resetting offset for partition words-0 to offset 888.
2019-06-26 15:19:38.882 INFO 8408 — [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : partitions assigned: [words-0]
2019-06-26 15:19:39.733 INFO 8408 — [container-0-C-1] onsumer$$EnhancerBySpringCGLIB$$d1631839 : Data received…baz
2019-06-26 15:19:40.683 INFO 8408 — [container-0-C-1] onsumer$$EnhancerBySpringCGLIB$$d1631839 : Data received…baz

One Comment

发表评论

电子邮件地址不会被公开。 必填项已用*标注