diff --git a/src/main/java/com/easysoftware/common/utils/ObjectMapperUtil.java b/src/main/java/com/easysoftware/common/utils/ObjectMapperUtil.java index 448817a24245c096b9737afe050c17896584ee6c..0d859297840608f26b1ec33c202507b573f122a8 100644 --- a/src/main/java/com/easysoftware/common/utils/ObjectMapperUtil.java +++ b/src/main/java/com/easysoftware/common/utils/ObjectMapperUtil.java @@ -59,6 +59,14 @@ public class ObjectMapperUtil { } } + public static T jsonToObject(String json, Class valueType) { + try { + return objectMapper.readValue(json, valueType); + } catch (Exception e) { + throw new MyJacksonException("字符串转化Java对象时异常"); + } + } + public static String writeValueAsStringForNull(Object obj) { objectMapper .getSerializerProvider() diff --git a/src/main/java/com/easysoftware/kafka/MyConsumer.java b/src/main/java/com/easysoftware/kafka/MyConsumer.java index 5c4191dba52aecfa242f9355b6fce0cd379e386a..6e1233aae96e06beb1ad2aabcccd8069157b989f 100644 --- a/src/main/java/com/easysoftware/kafka/MyConsumer.java +++ b/src/main/java/com/easysoftware/kafka/MyConsumer.java @@ -2,6 +2,7 @@ package com.easysoftware.kafka; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -14,17 +15,21 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Repository; +import org.springframework.stereotype.Service; +import com.easysoftware.application.applicationversion.ApplicationVersionService; +import com.easysoftware.common.utils.ObjectMapperUtil; import com.easysoftware.domain.applicationversion.ApplicationVersion; import com.easysoftware.domain.applicationversion.gateway.ApplicationVersionGateway; -import com.fasterxml.jackson.databind.ObjectMapper; +import com.easysoftware.infrastructure.applicationversion.gatewayimpl.converter.ApplicationVersionConvertor; +import com.easysoftware.infrastructure.applicationversion.gatewayimpl.dataobject.ApplicationVersionDO; import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; -@Repository +@Service public class MyConsumer { @Value("${consumer.topic.name}") String topicNames; @@ -44,12 +49,13 @@ public class MyConsumer { @Value("${consumer.enableAutoCommit}") String enableAutoCommit; + @Autowired + private ApplicationVersionService appVersionService; @Resource ApplicationVersionGateway AppVersionGateway; private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); - private static ObjectMapper objectMapper = new ObjectMapper(); private static final Logger logger = LoggerFactory.getLogger(MyConsumer.class); protected ArrayList> KafkaConsumerList= new ArrayList<>(); public static KafkaConsumer consumer; @@ -71,7 +77,7 @@ public class MyConsumer { public void KafkaToMysql() { for (KafkaConsumer customer : KafkaConsumerList) { ConsumerRecords poll = customer.poll(2); - dealData(poll); + dealDataByBatch(poll); customer.commitSync(); } } @@ -80,7 +86,7 @@ public class MyConsumer { for (ConsumerRecord record : records) { String value = record.value(); try { - ApplicationVersion appVersion = objectMapper.readValue(value, ApplicationVersion.class); + ApplicationVersion appVersion = ObjectMapperUtil.jsonToObject(value, ApplicationVersion.class); boolean found = AppVersionGateway.existApp(appVersion.getName()); if (found) { logger.info(String.format("The software %s is existed", appVersion.getName())); @@ -95,6 +101,32 @@ public class MyConsumer { } } + public void dealDataByBatch(ConsumerRecords records) { + Collection appList = new ArrayList<>(); + int partition = 0; + long offset = 0; + for (ConsumerRecord record : records) { + String value = record.value(); + try { + ApplicationVersion appVersion = ObjectMapperUtil.jsonToObject(value, ApplicationVersion.class); + boolean found = AppVersionGateway.existApp(appVersion.getName()); + if (found) { + logger.info(String.format("The software %s is existed", appVersion.getName())); + continue; + } + ApplicationVersionDO appVersionDO = ApplicationVersionConvertor.toDataObjectForCreate(appVersion); + appList.add(appVersionDO); + partition = record.partition(); + offset = record.offset(); + + } catch (Exception e) { + logger.error(e.getMessage() + ":" + value, e); + } + } + logger.info("partation: " + partition + ", offset: " + offset); + appVersionService.saveBatch(appList); + } + public void initConsumer() { Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); diff --git a/src/main/java/com/easysoftware/kafka/Producer.java b/src/main/java/com/easysoftware/kafka/Producer.java index 6de2bf639d0cddf94c2af7e0512b608644762e69..254e8a43b60d777ee33896612a90a6be9489a127 100644 --- a/src/main/java/com/easysoftware/kafka/Producer.java +++ b/src/main/java/com/easysoftware/kafka/Producer.java @@ -7,10 +7,9 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Repository; +import org.springframework.stereotype.Service; - -@Repository +@Service public class Producer { @Value("${bootstrap.servers}") String bootstrapServers;