Skip to content

Commit 81f6cab

Browse files
committed
优化Kafka收发功能。
1 parent 4a5756a commit 81f6cab

4 files changed

Lines changed: 56 additions & 162 deletions

File tree

src/main/java/com/xwintop/xJavaFxTool/controller/debugTools/KafkaToolController.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,8 @@ public class KafkaToolController extends KafkaToolView {
4343
private KafkaToolService kafkaToolService = new KafkaToolService(this);
4444
private ObservableList<KafkaToolTableBean> tableData = FXCollections.observableArrayList();
4545
private ObservableList<KafkaToolReceiverTableBean> receiverTableData = FXCollections.observableArrayList();
46-
private String[] messageTypeStrings = new String[] { "TextMessage", "ObjectMessage", "BytesMessage", "MapMessage",
47-
"StreamMessage" };
46+
private String[] messageTypeStrings = new String[] { "TextMessage" };
4847
private String[] quartzChoiceBoxStrings = new String[] { "简单表达式", "Cron表达式" };
49-
@SuppressWarnings("unchecked")
50-
private XProperty<Integer>[] receiverAcknowledgeModeChoiceBoxValues = new XProperty[] {
51-
new XProperty<Integer>("SESSION_TRANSACTED", Session.SESSION_TRANSACTED),
52-
new XProperty<Integer>("AUTO_ACKNOWLEDGE", Session.AUTO_ACKNOWLEDGE),
53-
new XProperty<Integer>("CLIENT_ACKNOWLEDGE", Session.CLIENT_ACKNOWLEDGE),
54-
new XProperty<Integer>("DUPS_OK_ACKNOWLEDGE", Session.DUPS_OK_ACKNOWLEDGE),
55-
new XProperty<Integer>("INDIVIDUAL_ACKNOWLEDGE", ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE) };
5648

5749
@Override
5850
public void initialize(URL location, ResourceBundle resources) {
@@ -103,8 +95,6 @@ private void initView() {
10395
}
10496

10597
private void initReceiverView() {
106-
receiverAcknowledgeModeChoiceBox.getItems().addAll(receiverAcknowledgeModeChoiceBoxValues);
107-
receiverAcknowledgeModeChoiceBox.setValue(receiverAcknowledgeModeChoiceBoxValues[4]);
10898
receiverMessageIDTableColumn
10999
.setCellValueFactory(new PropertyValueFactory<KafkaToolReceiverTableBean, String>("messageID"));
110100
receiverQueueTableColumn

src/main/java/com/xwintop/xJavaFxTool/services/debugTools/KafkaToolService.java

Lines changed: 46 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package com.xwintop.xJavaFxTool.services.debugTools;
22

3-
import com.google.gson.Gson;
43
import com.xwintop.xJavaFxTool.controller.debugTools.KafkaToolController;
5-
import com.xwintop.xJavaFxTool.job.ActiveMqToolJob;
64
import com.xwintop.xJavaFxTool.job.KafkaToolJob;
75
import com.xwintop.xJavaFxTool.manager.ScheduleManager;
86
import com.xwintop.xJavaFxTool.model.KafkaToolReceiverTableBean;
@@ -16,25 +14,24 @@
1614
import lombok.Getter;
1715
import lombok.Setter;
1816
import lombok.extern.log4j.Log4j;
19-
import org.apache.activemq.ActiveMQConnectionFactory;
20-
import org.apache.activemq.command.ActiveMQBytesMessage;
21-
import org.apache.activemq.command.ActiveMQMapMessage;
2217
import org.apache.commons.configuration.PropertiesConfiguration;
2318
import org.apache.commons.io.FileUtils;
2419
import org.apache.commons.lang.StringUtils;
20+
import org.apache.kafka.clients.consumer.ConsumerConfig;
21+
import org.apache.kafka.clients.consumer.ConsumerRecord;
22+
import org.apache.kafka.clients.consumer.ConsumerRecords;
23+
import org.apache.kafka.clients.consumer.KafkaConsumer;
2524
import org.apache.kafka.clients.producer.KafkaProducer;
2625
import org.apache.kafka.clients.producer.Producer;
26+
import org.apache.kafka.clients.producer.ProducerConfig;
2727
import org.apache.kafka.clients.producer.ProducerRecord;
28-
import org.quartz.*;
29-
import org.quartz.impl.StdSchedulerFactory;
28+
import org.apache.kafka.common.serialization.StringDeserializer;
29+
import org.apache.kafka.common.serialization.StringSerializer;
3030

31-
import javax.jms.*;
31+
import javax.jms.Message;
3232
import java.io.File;
3333
import java.text.SimpleDateFormat;
34-
import java.util.Date;
35-
import java.util.HashMap;
36-
import java.util.Map;
37-
import java.util.Properties;
34+
import java.util.*;
3835
import java.util.function.Consumer;
3936

4037
@Getter
@@ -104,45 +101,41 @@ public void loadingConfigureAction() {
104101

105102
public void sendAction() {
106103
Properties props = new Properties();
107-
props.put("bootstrap.servers", kafkaToolController.getUrlTextField().getText().trim());
108-
props.put("acks", "all");
109-
props.put("retries", 0);
110-
props.put("batch.size", 16384);
111-
props.put("linger.ms", 1);
112-
props.put("buffer.memory", 33554432);
113-
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
114-
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
104+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaToolController.getUrlTextField().getText().trim());
105+
props.put(ProducerConfig.ACKS_CONFIG, "all");
106+
props.put(ProducerConfig.RETRIES_CONFIG, 0);
107+
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
108+
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
109+
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
110+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
111+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
115112
for (KafkaToolTableBean tableBean : kafkaToolController.getTableData()) {
116113
if (tableBean.getIsSend()) {
117114
int sendNumber = Integer.parseInt(tableBean.getSendNumber());
118-
// ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
119-
// kafkaToolController.getUserNameTextField().getText(),
120-
// kafkaToolController.getPasswordTextField().getText(),
121-
// "tcp://" + kafkaToolController.getUrlTextField().getText().trim());
122115
Producer<String, String> producer = new KafkaProducer<>(props);
123116
try {
124117
for (int i = 0; i < sendNumber; i++) {
125-
producer.send(new ProducerRecord<>(tableBean.getQueue(), tableBean.getMessageType() , tableBean.getMessage()));
118+
producer.send(new ProducerRecord<>(tableBean.getQueue(), null, tableBean.getMessage()));
126119
}
127120
} catch (Exception e) {
128-
e.printStackTrace();
121+
log.error("发送失败:", e);
129122
TooltipUtil.showToast("发送失败:" + e.getMessage());
130123
} finally {
131-
producer.close();
124+
producer.close();
132125
}
133126
}
134127
}
135128
}
136129

137130
public boolean runQuartzAction(String quartzType, String cronText, int interval, int repeatCount) throws Exception {
138131
if ("简单表达式".equals(quartzType)) {
139-
scheduleManager.runQuartzAction(KafkaToolJob.class,this,interval,repeatCount);
132+
scheduleManager.runQuartzAction(KafkaToolJob.class, this, interval, repeatCount);
140133
} else if ("Cron表达式".equals(quartzType)) {
141134
if (StringUtils.isEmpty(cronText)) {
142135
TooltipUtil.showToast("cron表达式不能为空。");
143136
return false;
144137
}
145-
scheduleManager.runQuartzAction(KafkaToolJob.class,this,cronText);
138+
scheduleManager.runQuartzAction(KafkaToolJob.class, this, cronText);
146139
}
147140
return true;
148141
}
@@ -157,47 +150,6 @@ public boolean stopQuartzAction() throws Exception {
157150
* @Description: receiver端监听消息
158151
*/
159152
public void receiverMessageListenerAction() {
160-
// // Session: 一个发送或接收消息的线程
161-
// Session session;
162-
// // Destination :消息的目的地;消息发送给谁.
163-
// Destination destination;
164-
// // 消费者,消息接收者
165-
// MessageConsumer consumer;
166-
// connectionFactory = new ActiveMQConnectionFactory(kafkaToolController.getUserNameTextField().getText(),
167-
// kafkaToolController.getPasswordTextField().getText(),
168-
// "tcp://" + kafkaToolController.getUrlTextField().getText().trim());
169-
// try {
170-
// if (connection != null) {
171-
// connection.close();
172-
// }
173-
// // 构造从工厂得到连接对象
174-
// connection = connectionFactory.createConnection();
175-
// // 启动
176-
// connection.start();
177-
// // 获取操作连接
178-
// session = connection.createSession(Boolean.FALSE, kafkaToolController
179-
// .getReceiverAcknowledgeModeChoiceBox().getValue().getBean());
180-
// // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在Kafka的console配置
181-
// String queue = kafkaToolController.getReceiverQueueTextField().getText();
182-
// destination = session.createQueue(queue);
183-
// consumer = session.createConsumer(destination);
184-
// kafkaToolController.getReceiverTableData().clear();
185-
// receiverMessageMap.clear();
186-
// consumer.setMessageListener(new MessageListener() {// 有事务限制
187-
// @Override
188-
// public void onMessage(Message message) {
189-
// addReceiverTableBean(message);
190-
// // try {
191-
// // session.commit();
192-
// // } catch (JMSException e) {
193-
// // e.printStackTrace();
194-
// // }
195-
// }
196-
// });
197-
// } catch (Exception e) {
198-
// log.error(e.getMessage());
199-
// TooltipUtil.showToast(e.getMessage());
200-
// }
201153
}
202154

203155
public void receiverMessageStopListenerAction() {
@@ -211,84 +163,36 @@ public void receiverMessageStopListenerAction() {
211163
// }
212164
}
213165

214-
private void addReceiverTableBean(Message message) {
215-
String queue = kafkaToolController.getReceiverQueueTextField().getText();
216-
String messageType = "TextMessage";
217-
String messageSring = null;
218-
boolean isAcknowledge = false;
219-
try {
220-
if (message instanceof TextMessage) {
221-
messageType = "TextMessage";
222-
messageSring = ((TextMessage) message).getText();
223-
} else if (message instanceof ObjectMessage) {
224-
messageType = "ObjectMessage";
225-
messageSring = ((ObjectMessage) message).getObject().toString();
226-
} else if (message instanceof BytesMessage) {
227-
messageType = "BytesMessage";
228-
messageSring = new String(((ActiveMQBytesMessage) message).getContent().getData());
229-
} else if (message instanceof MapMessage) {
230-
messageType = "MapMessage";
231-
messageSring = ((ActiveMQMapMessage) message).getContentMap().toString();
232-
} else if (message instanceof StreamMessage) {
233-
messageType = "StreamMessage";
234-
messageSring = ((StreamMessage) message).readString();
235-
}
236-
String timestamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(message.getJMSTimestamp()));
237-
KafkaToolReceiverTableBean kafkaToolReceiverTableBean = new KafkaToolReceiverTableBean(
238-
message.getJMSMessageID(), queue, messageSring, messageType, timestamp, isAcknowledge);
239-
kafkaToolController.getReceiverTableData().add(kafkaToolReceiverTableBean);
240-
receiverMessageMap.put(message.getJMSMessageID(), message);
241-
} catch (Exception e) {
242-
log.error(e.getMessage());
243-
}
244-
}
245-
246166
/**
247167
* @Title: receiverPullMessageAction
248168
* @Description: receiver端拉取消息
249169
*/
250170
@FXML
251171
public void receiverPullMessageAction() {
252-
// // Session: 一个发送或接收消息的线程
253-
// Session session;
254-
// // Destination :消息的目的地;消息发送给谁.
255-
// Destination destination;
256-
// // 消费者,消息接收者
257-
// MessageConsumer consumer;
258-
// connectionFactory = new ActiveMQConnectionFactory(kafkaToolController.getUserNameTextField().getText(),
259-
// kafkaToolController.getPasswordTextField().getText(),
260-
// "tcp://" + kafkaToolController.getUrlTextField().getText().trim());
261-
// try {
262-
// if (connection != null) {
263-
// connection.close();
264-
// }
265-
// // 构造从工厂得到连接对象
266-
// connection = connectionFactory.createConnection();
267-
// // 启动
268-
// connection.start();
269-
// // 获取操作连接
270-
// // session = connection.createSession(Boolean.FALSE,
271-
// // Session.AUTO_ACKNOWLEDGE);
272-
// session = connection.createSession(Boolean.FALSE, kafkaToolController
273-
// .getReceiverAcknowledgeModeChoiceBox().getValue().getBean());
274-
// // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在Kafka的console配置
275-
// String queue = kafkaToolController.getReceiverQueueTextField().getText();
276-
// destination = session.createQueue(queue);
277-
// consumer = session.createConsumer(destination);
278-
// kafkaToolController.getReceiverTableData().clear();
279-
// receiverMessageMap.clear();
280-
// while (true) {
281-
// // 设置接收者接收消息的时间,为了便于测试,这里谁定为100s
282-
// Message message = consumer.receive(1000);
283-
// if (null == message) {
284-
// break;
285-
// }
286-
// addReceiverTableBean(message);
287-
// }
288-
// } catch (Exception e) {
289-
// log.error(e.getMessage());
290-
// TooltipUtil.showToast(e.getMessage());
291-
// }
172+
Properties props = new Properties();
173+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaToolController.getUrlTextField().getText().trim());
174+
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaToolController.getGroupIdTextField().getText().trim());
175+
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
176+
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 180000);
177+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
178+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
179+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
180+
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5242880);
181+
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 305000);
182+
props.put("pollTimeoutms", 5000);
183+
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3);
184+
KafkaConsumer consumer = new KafkaConsumer(props);
185+
consumer.subscribe(Arrays.asList(kafkaToolController.getReceiverQueueTextField().getText().trim()));
186+
ConsumerRecords<String, String> records = consumer.poll(0);
187+
for (ConsumerRecord<String, String> record : records) {
188+
// log.info("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
189+
log.info("msgValue" + record.value());
190+
String timestamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(record.timestamp()));
191+
KafkaToolReceiverTableBean kafkaToolReceiverTableBean = new KafkaToolReceiverTableBean(
192+
record.key(), record.topic(), record.value(), "String", timestamp, false);
193+
kafkaToolController.getReceiverTableData().add(kafkaToolReceiverTableBean);
194+
}
195+
consumer.commitAsync();
292196
}
293197

294198
public KafkaToolService(KafkaToolController kafkaToolController) {

src/main/java/com/xwintop/xJavaFxTool/view/debugTools/KafkaToolView.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,12 @@ public abstract class KafkaToolView implements Initializable {
7575
@FXML
7676
protected TextField receiverQueueTextField;
7777
@FXML
78+
protected TextField groupIdTextField;
79+
@FXML
7880
protected Button receiverPullMessageButton;
7981
@FXML
8082
protected Button receiverMessageListenerButton;
8183
@FXML
82-
protected ChoiceBox<XProperty<Integer>> receiverAcknowledgeModeChoiceBox;
83-
@FXML
8484
protected TableView<KafkaToolReceiverTableBean> receiverTableView;
8585
@FXML
8686
protected TableColumn<KafkaToolReceiverTableBean, String> receiverMessageIDTableColumn;

src/main/resources/com/xwintop/xJavaFxTool/fxmlView/debugTools/KafkaTool.fxml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
<?import javafx.scene.layout.VBox?>
2020
<?import javafx.scene.text.Font?>
2121

22-
<AnchorPane prefHeight="522.0" prefWidth="844.0" xmlns="http://javafx.com/javafx/8.0.141" xmlns:fx="http://javafx.com/fxml/1" fx:controller="com.xwintop.xJavaFxTool.controller.debugTools.KafkaToolController">
22+
<AnchorPane prefHeight="522.0" prefWidth="844.0" xmlns="http://javafx.com/javafx/8.0.171" xmlns:fx="http://javafx.com/fxml/1" fx:controller="com.xwintop.xJavaFxTool.controller.debugTools.KafkaToolController">
2323
<children>
2424
<BorderPane layoutX="14.0" layoutY="7.0" AnchorPane.bottomAnchor="10.0" AnchorPane.leftAnchor="10.0" AnchorPane.rightAnchor="10.0" AnchorPane.topAnchor="10.0">
2525
<top>
@@ -33,9 +33,9 @@
3333
<Label text="连接地址:" />
3434
<TextField fx:id="urlTextField" text="localhost:61616" />
3535
<Label text="用户名:" />
36-
<TextField fx:id="userNameTextField" />
36+
<TextField fx:id="userNameTextField" disable="true" />
3737
<Label text="密码:" />
38-
<TextField fx:id="passwordTextField" />
38+
<TextField fx:id="passwordTextField" disable="true" />
3939
</children>
4040
</HBox>
4141
</children>
@@ -153,14 +153,14 @@
153153
<Insets left="-5.0" />
154154
</HBox.margin>
155155
</TextField>
156-
<Button fx:id="receiverPullMessageButton" mnemonicParsing="false" onAction="#receiverPullMessageAction" text="拉取消息" />
157-
<Button fx:id="receiverMessageListenerButton" mnemonicParsing="false" onAction="#receiverMessageListenerAction" text="监听消息" />
158-
<Label text="AcknowledgeMode:">
156+
<Label text="groupId:">
159157
<HBox.margin>
160158
<Insets left="10.0" />
161159
</HBox.margin>
162160
</Label>
163-
<ChoiceBox fx:id="receiverAcknowledgeModeChoiceBox" prefWidth="200.0" />
161+
<TextField fx:id="groupIdTextField" text="xJavaFxTool" />
162+
<Button fx:id="receiverPullMessageButton" mnemonicParsing="false" onAction="#receiverPullMessageAction" text="拉取消息" />
163+
<Button fx:id="receiverMessageListenerButton" mnemonicParsing="false" onAction="#receiverMessageListenerAction" text="监听消息" />
164164
</children>
165165
<VBox.margin>
166166
<Insets top="5.0" />

0 commit comments

Comments
 (0)