一、Java环境

二、kafka环境

基于docker 安装,开启服务,对外端口映射即可,创建Topic 使用Java Api操作即可。可参考其他资料关于docker/kafka的安装。
这是kafka的目录信息

三、代码示例,项目结构

maven + Java8 + IntelliJ IDEA + Springboot

  • 3.1 创建Maven工程,pom.xml 如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.xiaour.spring.boot</groupId>
	<artifactId>kafka</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>kafka</name>
	<description>Kafka Demo project for Spring Boot</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.2.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>

		<dependency>
			<groupId>com.tonels</groupId>
			<artifactId>common</artifactId>
			<version>1.0-SNAPSHOT</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>com.google.code.gson</groupId>
			<artifactId>gson</artifactId>
			<version>2.8.2</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
			<version>RELEASE</version>
		</dependency>

	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>


</project>

  • 3.2 Springboot集成 kafka,通过Yml文件
server:
  servlet:
    context-path: /
  port: 8080
spring:
  kafka:
    bootstrap-servers: 192.168.1.74:9092
    #生产者的配置,大部分可以使用默认的,这里列出几个比较重要的属性
    producer:
      #每批次发送消息的数量
      batch-size: 16
      #设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。
      retries: 0
      #producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。
      buffer-memory: 33554432
      #key序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    #消费者的配置
    consumer:
      #Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】
      auto-offset-reset: latest
      #是否开启自动提交
      enable-auto-commit: true
      #自动提交的时间间隔
      auto-commit-interval: 100
      #key的解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #value的解码方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #在/usr/local/etc/kafka/consumer.properties中有配置
      group-id: ls
  • 3.3 模拟生产者
    消息体message数据封装
@Data
public class Message {
   
    private String id;
    private String msg;
    private LocalDateTime sendTime;
}

生产过程


@Component
public class Producer {
   

    @Autowired
    private KafkaTemplate kafkaTemplate;

    private static Gson gson = new GsonBuilder().create();

    //发送消息方法
    public void send() {
   
        Message message = new Message();
        message.setId("KF_"+System.currentTimeMillis());
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(LocalDateTime.now());

        // 指定topic sl_test
        kafkaTemplate.send("sl_test",JSONUtil.toJsonStr(message));
    }
}
```java

发送消息,模拟http发送消息
```java
@RestController
@RequestMapping("/kafka")
public class SendController {
   

    @Autowired
    private Producer producer;

    @RequestMapping(value = "/send")
    public String send() {
   
        producer.send();
        return "{\"code\":0}";
    }
}
  • 3.4 消费者自动消费Topic,并相应业务处理
@Component
public class Consumer {
   

    @KafkaListener(topics = {
   "sl_test"})
    public void listen(ConsumerRecord<?, ?> record){
   

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {
   

            Object message = kafkaMessage.get();
            System.out.println("---->"+record);
            System.out.println("---->"+message);

            Message m = JSONUtil.toBean(JSONUtil.toJsonStr(message), Message.class);
            System.out.println("转成对象后。。" + m);
        }

    }
}
  • 3.5 控制台输出
---->ConsumerRecord(topic = sl_test, partition = 0, offset = 29, CreateTime = 1567696265396, serialized key size = -1, serialized value size = 107, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"msg":"2689bc7a-9781-45ab-80b1-2b54b94c872b","id":"KF_1567696265344","sendTime":"2019-09-05T23:11:05.348"})
---->{"msg":"2689bc7a-9781-45ab-80b1-2b54b94c872b","id":"KF_1567696265344","sendTime":"2019-09-05T23:11:05.348"}
转成对象后。。Message(id=KF_1567696265344, msg=2689bc7a-9781-45ab-80b1-2b54b94c872b, sendTime=2019-09-05T23:11:05.348)
  • 3.6 查看服务器日志信息
    首先会自动创建 Topic

    日志文件里会基于生产的消息,存在日志

四、更多Kafka,,客户端、服务端,监控可参考下文