Skip to content

Commit e90c562

Browse files
committed
优化传输工具Receiver空任务执行。
1 parent b86bbcf commit e90c562

30 files changed

Lines changed: 71 additions & 35 deletions

src/main/java/com/xwintop/xTransfer/common/model/Msg.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.xwintop.xTransfer.common.model;
22

33
import com.xwintop.xTransfer.messaging.IMessage;
4+
import org.apache.commons.lang3.ArrayUtils;
45
import org.apache.commons.lang3.StringUtils;
56
import org.apache.commons.lang3.time.DateFormatUtils;
67

@@ -48,7 +49,7 @@ public Msg(IMessage msg) {
4849
if (null != msg.getRawData()) {
4950
this.fields.put("length", msg.getRawData().length);
5051
} else if (null != msg.getMessage()) {
51-
this.fields.put("length", msg.getMessage().length);
52+
this.fields.put("length", ArrayUtils.getLength(msg.getMessage()));
5253
}
5354
}
5455

src/main/java/com/xwintop/xTransfer/filter/service/impl/FilterBackupImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import lombok.Data;
1717
import lombok.extern.slf4j.Slf4j;
1818
import org.apache.commons.io.FileUtils;
19+
import org.apache.commons.lang3.ArrayUtils;
1920
import org.apache.commons.lang3.StringUtils;
2021
import org.springframework.context.annotation.Scope;
2122
import org.springframework.stereotype.Service;
@@ -121,7 +122,7 @@ public void doFilter(IMessage msg, Map params) throws Exception {
121122
msgLogInfo.put(LOGKEYS.CHANNEL_OUT_TYPE, LOGVALUES.CHANNEL_TYPE_FS);
122123
msgLogInfo.put(LOGKEYS.CHANNEL_OUT, filterConfigBackup.getPath());
123124
msgLogInfo.put(LOGKEYS.MSG_TAG, msg.getFileName());
124-
msgLogInfo.put(LOGKEYS.MSG_LENGTH, msg.getMessage().length);
125+
msgLogInfo.put(LOGKEYS.MSG_LENGTH, ArrayUtils.getLength(msg.getMessage()));
125126
msgLogInfo.put(LOGKEYS.JOB_ID, params.get(TaskQuartzJob.JOBID));
126127
msgLogInfo.put(LOGKEYS.JOB_SEQ, params.get(TaskQuartzJob.JOBSEQ));
127128
msgLogInfo.put(LOGKEYS.RECEIVER_TYPE, msg.getProperty(LOGKEYS.RECEIVER_TYPE));

src/main/java/com/xwintop/xTransfer/filter/service/impl/FilterGroovyScriptImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import groovy.lang.Binding;
1515
import groovy.lang.GroovyShell;
1616
import lombok.extern.slf4j.Slf4j;
17+
import org.apache.commons.lang3.ArrayUtils;
1718
import org.apache.commons.lang3.StringUtils;
1819
import org.springframework.context.annotation.Scope;
1920
import org.springframework.stereotype.Service;
@@ -70,7 +71,7 @@ public void doFilter(IMessage msg, Map params) throws Exception {
7071
msgLogInfo.put(LOGKEYS.CHANNEL_IN_TYPE, msg.getProperty(LOGKEYS.CHANNEL_IN_TYPE));
7172
msgLogInfo.put(LOGKEYS.CHANNEL_IN, msg.getProperty(LOGKEYS.CHANNEL_IN));
7273
msgLogInfo.put(LOGKEYS.MSG_TAG, msg.getFileName());
73-
msgLogInfo.put(LOGKEYS.MSG_LENGTH, msg.getMessage().length);
74+
msgLogInfo.put(LOGKEYS.MSG_LENGTH, ArrayUtils.getLength(msg.getMessage()));
7475
msgLogInfo.put(LOGKEYS.JOB_ID, params.get(TaskQuartzJob.JOBID));
7576
msgLogInfo.put(LOGKEYS.JOB_SEQ, params.get(TaskQuartzJob.JOBSEQ));
7677
msgLogInfo.put(LOGKEYS.RECEIVER_TYPE, msg.getProperty(LOGKEYS.RECEIVER_TYPE));

src/main/java/com/xwintop/xTransfer/filter/service/impl/FilterJavaScriptImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.xwintop.xTransfer.messaging.IMessage;
1313
import com.xwintop.xTransfer.task.quartz.TaskQuartzJob;
1414
import lombok.extern.slf4j.Slf4j;
15+
import org.apache.commons.lang3.ArrayUtils;
1516
import org.apache.commons.lang3.StringUtils;
1617
import org.springframework.context.annotation.Scope;
1718
import org.springframework.stereotype.Service;
@@ -71,7 +72,7 @@ public void doFilter(IMessage msg, Map params) throws Exception {
7172
msgLogInfo.put(LOGKEYS.CHANNEL_IN_TYPE, msg.getProperty(LOGKEYS.CHANNEL_IN_TYPE));
7273
msgLogInfo.put(LOGKEYS.CHANNEL_IN, msg.getProperty(LOGKEYS.CHANNEL_IN));
7374
msgLogInfo.put(LOGKEYS.MSG_TAG, msg.getFileName());
74-
msgLogInfo.put(LOGKEYS.MSG_LENGTH, msg.getMessage().length);
75+
msgLogInfo.put(LOGKEYS.MSG_LENGTH, ArrayUtils.getLength(msg.getMessage()));
7576
msgLogInfo.put(LOGKEYS.JOB_ID, params.get(TaskQuartzJob.JOBID));
7677
msgLogInfo.put(LOGKEYS.JOB_SEQ, params.get(TaskQuartzJob.JOBSEQ));
7778
msgLogInfo.put(LOGKEYS.RECEIVER_TYPE, msg.getProperty(LOGKEYS.RECEIVER_TYPE));

src/main/java/com/xwintop/xTransfer/filter/service/impl/FilterLuaScriptImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.xwintop.xTransfer.messaging.IMessage;
1313
import com.xwintop.xTransfer.task.quartz.TaskQuartzJob;
1414
import lombok.extern.slf4j.Slf4j;
15+
import org.apache.commons.lang3.ArrayUtils;
1516
import org.apache.commons.lang3.StringUtils;
1617
import org.luaj.vm2.Globals;
1718
import org.luaj.vm2.LuaValue;
@@ -74,7 +75,7 @@ public void doFilter(IMessage msg, Map params) throws Exception {
7475
msgLogInfo.put(LOGKEYS.CHANNEL_IN_TYPE, msg.getProperty(LOGKEYS.CHANNEL_IN_TYPE));
7576
msgLogInfo.put(LOGKEYS.CHANNEL_IN, msg.getProperty(LOGKEYS.CHANNEL_IN));
7677
msgLogInfo.put(LOGKEYS.MSG_TAG, msg.getFileName());
77-
msgLogInfo.put(LOGKEYS.MSG_LENGTH, msg.getMessage().length);
78+
msgLogInfo.put(LOGKEYS.MSG_LENGTH, ArrayUtils.getLength(msg.getMessage()));
7879
msgLogInfo.put(LOGKEYS.JOB_ID, params.get(TaskQuartzJob.JOBID));
7980
msgLogInfo.put(LOGKEYS.JOB_SEQ, params.get(TaskQuartzJob.JOBSEQ));
8081
msgLogInfo.put(LOGKEYS.RECEIVER_TYPE, msg.getProperty(LOGKEYS.RECEIVER_TYPE));

src/main/java/com/xwintop/xTransfer/filter/service/impl/FilterOracleSqlldrImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public void doFilter(IMessage msg, Map params) throws Exception {
9898
// msgLogInfo.put(LOGKEYS.CHANNEL_OUT_TYPE, "MSGTODB");
9999
// msgLogInfo.put(LOGKEYS.CHANNEL_OUT, msg.getFileName());
100100
// msgLogInfo.put(LOGKEYS.MSG_TAG, msg.getFileName());
101-
// msgLogInfo.put(LOGKEYS.MSG_LENGTH, msg.getMessage().length);
101+
// msgLogInfo.put(LOGKEYS.MSG_LENGTH, ArrayUtils.getLength(msg.getMessage()));
102102
// msgLogInfo.put(LOGKEYS.JOB_ID, params.get(TaskQuartzJob.JOBID));
103103
// msgLogInfo.put(LOGKEYS.JOB_SEQ, params.get(TaskQuartzJob.JOBSEQ));
104104
// msgLogInfo.put(LOGKEYS.RECEIVER_TYPE, msg.getProperty(LOGKEYS.RECEIVER_TYPE));

src/main/java/com/xwintop/xTransfer/filter/service/impl/FilterPythonScriptImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.xwintop.xTransfer.messaging.IMessage;
1313
import com.xwintop.xTransfer.task.quartz.TaskQuartzJob;
1414
import lombok.extern.slf4j.Slf4j;
15+
import org.apache.commons.lang3.ArrayUtils;
1516
import org.apache.commons.lang3.StringUtils;
1617
import org.python.util.PythonInterpreter;
1718
import org.springframework.context.annotation.Scope;
@@ -66,7 +67,7 @@ public void doFilter(IMessage msg, Map params) throws Exception {
6667
msgLogInfo.put(LOGKEYS.CHANNEL_IN_TYPE, msg.getProperty(LOGKEYS.CHANNEL_IN_TYPE));
6768
msgLogInfo.put(LOGKEYS.CHANNEL_IN, msg.getProperty(LOGKEYS.CHANNEL_IN));
6869
msgLogInfo.put(LOGKEYS.MSG_TAG, msg.getFileName());
69-
msgLogInfo.put(LOGKEYS.MSG_LENGTH, msg.getMessage().length);
70+
msgLogInfo.put(LOGKEYS.MSG_LENGTH, ArrayUtils.getLength(msg.getMessage()));
7071
msgLogInfo.put(LOGKEYS.JOB_ID, params.get(TaskQuartzJob.JOBID));
7172
msgLogInfo.put(LOGKEYS.JOB_SEQ, params.get(TaskQuartzJob.JOBSEQ));
7273
msgLogInfo.put(LOGKEYS.RECEIVER_TYPE, msg.getProperty(LOGKEYS.RECEIVER_TYPE));

src/main/java/com/xwintop/xTransfer/receiver/service/impl/ReceiverActiveMqImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.activemq.ActiveMQConnectionFactory;
1414
import org.apache.activemq.command.ActiveMQBytesMessage;
1515
import org.apache.activemq.command.ActiveMQMapMessage;
16+
import org.apache.commons.lang3.ArrayUtils;
1617
import org.apache.commons.lang3.StringUtils;
1718
import org.springframework.context.annotation.Scope;
1819
import org.springframework.jms.connection.CachingConnectionFactory;
@@ -103,7 +104,7 @@ public void receive(Map params) throws Exception {
103104
msgLogInfo.put(LOGKEYS.CHANNEL_IN_TYPE, LOGVALUES.CHANNEL_TYPE_ACTIVE_MQ);
104105
msgLogInfo.put(LOGKEYS.CHANNEL_IN, receiverConfigActiveMq.getHost() + ":" + receiverConfigActiveMq.getQueueName());
105106
msgLogInfo.put(LOGKEYS.MSG_TAG, msg.getFileName());
106-
msgLogInfo.put(LOGKEYS.MSG_LENGTH, msg.getMessage().length);
107+
msgLogInfo.put(LOGKEYS.MSG_LENGTH, ArrayUtils.getLength(msg.getMessage()));
107108
msgLogInfo.put(LOGKEYS.JOB_ID, params.get(TaskQuartzJob.JOBID));
108109
msgLogInfo.put(LOGKEYS.JOB_SEQ, params.get(TaskQuartzJob.JOBSEQ));
109110
msgLogInfo.put(LOGKEYS.RECEIVER_TYPE, LOGVALUES.RCV_TYPE_MQ);

src/main/java/com/xwintop/xTransfer/receiver/service/impl/ReceiverConfigServiceImpl.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.xwintop.xTransfer.receiver.service.impl;
22

33
import cn.hutool.core.lang.Singleton;
4-
import com.xwintop.xTransfer.messaging.MessageHandler;
4+
import com.xwintop.xTransfer.messaging.*;
55
import com.xwintop.xTransfer.receiver.bean.ReceiverConfig;
66
import com.xwintop.xTransfer.receiver.service.Receiver;
77
import com.xwintop.xTransfer.receiver.service.ReceiverConfigService;
@@ -44,6 +44,14 @@ public void executeReceiver(TaskConfig taskConfig, MessageHandler messageHandler
4444
Map<String, Object> params = new HashMap<>();
4545
params.put(TaskQuartzJob.JOBID, taskConfig.getName());
4646
params.put(TaskQuartzJob.JOBSEQ, taskConfig.getProperty(TaskQuartzJob.JOBSEQ));
47+
if (receiverConfigList.size() == 0) {
48+
IMessage msg = new DefaultMessage();
49+
IContext ctx = new DefaultContext();
50+
ctx.setMessage(msg);
51+
log.info("执行receiver为空:" + taskConfig.getName());
52+
messageHandler.handle(ctx);
53+
return;
54+
}
4755
for (ReceiverConfig receiverConfig : receiverConfigList) {
4856
if (StringUtils.isBlank(receiverConfig.getId())) {
4957
receiverConfig.setId(taskConfig.getName() + "_" + receiverIndex);

src/main/java/com/xwintop/xTransfer/receiver/service/impl/ReceiverEmailImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import lombok.Data;
1616
import lombok.extern.slf4j.Slf4j;
1717
import org.apache.commons.io.IOUtils;
18+
import org.apache.commons.lang3.ArrayUtils;
1819
import org.apache.commons.lang3.StringUtils;
1920
import org.apache.commons.lang3.time.DateFormatUtils;
2021
import org.springframework.context.annotation.Scope;
@@ -128,7 +129,7 @@ private void sendMsgToListeners(List msgList, String from, String to, String sub
128129
msgLogInfo.put(LOGKEYS.CHANNEL_IN_TYPE, LOGVALUES.CHANNEL_TYPE_EMAIL);
129130
msgLogInfo.put(LOGKEYS.CHANNEL_IN, receiverConfigEmail.getHost() + ":" + receiverConfigEmail.getPort());
130131
msgLogInfo.put(LOGKEYS.MSG_TAG, msg.getFileName());
131-
msgLogInfo.put(LOGKEYS.MSG_LENGTH, msg.getMessage().length);
132+
msgLogInfo.put(LOGKEYS.MSG_LENGTH, ArrayUtils.getLength(msg.getMessage()));
132133
msgLogInfo.put(LOGKEYS.JOB_ID, params.get(TaskQuartzJob.JOBID));
133134
msgLogInfo.put(LOGKEYS.JOB_SEQ, params.get(TaskQuartzJob.JOBSEQ));
134135
msgLogInfo.put(LOGKEYS.RECEIVER_TYPE, "emailReceiver");

0 commit comments

Comments
 (0)