2727import org .apache .kafka .clients .producer .ProducerRecord ;
2828import org .apache .kafka .common .serialization .StringDeserializer ;
2929import org .apache .kafka .common .serialization .StringSerializer ;
30+ import org .springframework .kafka .core .ConsumerFactory ;
31+ import org .springframework .kafka .core .DefaultKafkaConsumerFactory ;
32+ import org .springframework .kafka .listener .AcknowledgingMessageListener ;
33+ import org .springframework .kafka .listener .ConcurrentMessageListenerContainer ;
34+ import org .springframework .kafka .listener .ContainerProperties ;
3035
3136import javax .jms .Message ;
3237import java .io .File ;
@@ -50,6 +55,8 @@ public class KafkaToolService {
5055 private ScheduleManager scheduleManager = new ScheduleManager ();
5156 private Map <String , Message > receiverMessageMap = new HashMap <String , Message >();
5257
58+ private ConcurrentMessageListenerContainer <String , String > concurrentMessageListenerContainer = null ;
59+
5360 public void saveConfigure () throws Exception {
5461 saveConfigure (ConfigureUtil .getConfigureFile (fileName ));
5562 }
@@ -157,17 +164,47 @@ public boolean stopQuartzAction() throws Exception {
157164 * @Description: receiver端监听消息
158165 */
159166 public void receiverMessageListenerAction () {
167+ if (concurrentMessageListenerContainer == null ) {
168+ Properties props = new Properties ();
169+ props .put (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , kafkaToolController .getUrlTextField ().getText ().trim ());
170+ props .put (ConsumerConfig .GROUP_ID_CONFIG , kafkaToolController .getGroupIdTextField ().getText ().trim ());
171+ props .put (ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG , "false" );
172+ props .put (ConsumerConfig .SESSION_TIMEOUT_MS_CONFIG , 180000 );
173+ props .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , "earliest" );
174+ props .put (ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer .class );
175+ props .put (ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer .class );
176+ props .put (ConsumerConfig .MAX_PARTITION_FETCH_BYTES_CONFIG , 5242880 );
177+ props .put (ConsumerConfig .REQUEST_TIMEOUT_MS_CONFIG , 305000 );
178+ props .put ("pollTimeoutms" , 5000 );
179+ props .put (ConsumerConfig .MAX_POLL_RECORDS_CONFIG , 3 );
180+ ConsumerFactory <String , byte []> consumerFactory = new DefaultKafkaConsumerFactory (props );
181+ ContainerProperties containerProperties = new ContainerProperties (kafkaToolController .getReceiverQueueTextField ().getText ().trim ());
182+ containerProperties .setAckMode (ContainerProperties .AckMode .MANUAL );
183+
184+ concurrentMessageListenerContainer = new ConcurrentMessageListenerContainer (consumerFactory , containerProperties );
185+ concurrentMessageListenerContainer .setErrorHandler ((thrownException , data ) -> {
186+ log .error (" Kafka接收器ErrorHandler:" , thrownException );
187+ concurrentMessageListenerContainer .stop ();
188+ });
189+ concurrentMessageListenerContainer .setupMessageListener ((AcknowledgingMessageListener <String , String >) (record , acknowledgment ) -> {
190+ try {
191+ String timestamp = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss" ).format (new Date (record .timestamp ()));
192+ KafkaToolReceiverTableBean kafkaToolReceiverTableBean = new KafkaToolReceiverTableBean (record .key (), record .topic (), record .value (), "String" , timestamp , true );
193+ kafkaToolController .getReceiverTableData ().add (kafkaToolReceiverTableBean );
194+ acknowledgment .acknowledge ();
195+ } catch (Exception e ) {
196+ log .error ("ReceiverKafka处理消息失败:" + e .getMessage ());
197+ concurrentMessageListenerContainer .stop ();
198+ }
199+ });
200+ concurrentMessageListenerContainer .start ();
201+ }
160202 }
161203
162204 public void receiverMessageStopListenerAction () {
163- // if (connection != null) {
164- // try {
165- // connection.close();
166- // } catch (JMSException e) {
167- // log.error(e.getMessage());
168- // TooltipUtil.showToast(e.getMessage());
169- // }
170- // }
205+ if (concurrentMessageListenerContainer != null ) {
206+ concurrentMessageListenerContainer .stop ();
207+ }
171208 }
172209
173210 /**
@@ -189,17 +226,21 @@ public void receiverPullMessageAction() {
189226 props .put ("pollTimeoutms" , 5000 );
190227 props .put (ConsumerConfig .MAX_POLL_RECORDS_CONFIG , 3 );
191228 KafkaConsumer consumer = new KafkaConsumer (props );
192- consumer .subscribe (Arrays .asList (kafkaToolController .getReceiverQueueTextField ().getText ().trim ()));
193- ConsumerRecords <String , String > records = consumer .poll (0 );
194- for (ConsumerRecord <String , String > record : records ) {
229+ try {
230+ consumer .subscribe (Arrays .asList (kafkaToolController .getReceiverQueueTextField ().getText ().trim ()));
231+ ConsumerRecords <String , String > records = consumer .poll (1000 );
232+ for (ConsumerRecord <String , String > record : records ) {
195233// log.info("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
196- log .info ("msgValue" + record .value ());
197- String timestamp = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss" ).format (new Date (record .timestamp ()));
198- KafkaToolReceiverTableBean kafkaToolReceiverTableBean = new KafkaToolReceiverTableBean (
199- record .key (), record .topic (), record .value (), "String" , timestamp , false );
200- kafkaToolController .getReceiverTableData ().add (kafkaToolReceiverTableBean );
234+ log .info ("msgValue" + record .value ());
235+ String timestamp = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss" ).format (new Date (record .timestamp ()));
236+ KafkaToolReceiverTableBean kafkaToolReceiverTableBean = new KafkaToolReceiverTableBean (
237+ record .key (), record .topic (), record .value (), "String" , timestamp , false );
238+ kafkaToolController .getReceiverTableData ().add (kafkaToolReceiverTableBean );
239+ }
240+ consumer .commitAsync ();
241+ } finally {
242+ consumer .close ();
201243 }
202- consumer .commitAsync ();
203244 }
204245
205246 public KafkaToolService (KafkaToolController kafkaToolController ) {
0 commit comments