Skip to content

Commit 3074dc1

Browse files
committed
添加hdfs传输工具。
1 parent 6b7ac82 commit 3074dc1

8 files changed

Lines changed: 382 additions & 0 deletions

File tree

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ Receiver接收器:
159159
| RabbitMq | ReceiverConfigRabbitMq| 使用RabbitMq协议接收消息 |
160160
| RocketMq | ReceiverConfigRocketMq| 使用RocketMq协议接收消息 |
161161
| ActiveMq | ReceiverConfigActiveMq| 使用ActiveMq协议接收消息 |
162+
| Hdfs | ReceiverConfigHdfs | 使用HDFS协议接收消息 |
162163

163164
Filter处理器:
164165

@@ -190,6 +191,7 @@ Sender发送器:
190191
| RabbitMq | SenderConfigRabbitMq| 使用RabbitMq协议发送消息 |
191192
| RocketMq | SenderConfigRocketMq| 使用RocketMq协议发送消息 |
192193
| ActiveMq | SenderConfigActiveMq| 使用ActiveMq协议发送消息 |
194+
| Hdfs | SenderConfigHdfs | 使用HDFS协议发送消息 |
193195

194196

195197
项目开发中,以后会陆续添加新工具,欢迎大家参与其中,多提提意见,谢谢。

pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,18 @@
197197
<version>0.11</version>
198198
</dependency>
199199

200+
<!-- Hadoop开发 -->
201+
<dependency>
202+
<groupId>org.apache.hadoop</groupId>
203+
<artifactId>hadoop-common</artifactId>
204+
<version>2.8.0</version>
205+
</dependency>
206+
<dependency>
207+
<groupId>org.apache.hadoop</groupId>
208+
<artifactId>hadoop-hdfs</artifactId>
209+
<version>2.8.0</version>
210+
</dependency>
211+
200212
<!--excel操作工具-->
201213
<dependency>
202214
<groupId>org.apache.poi</groupId>

src/main/java/com/xwintop/xJavaFxTool/controller/developTools/xTransferTool/TransferToolTaskViewController.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ public void changed(ObservableValue<? extends String> observable, String oldValu
110110
"ReceiverConfigRabbitMq",
111111
"ReceiverConfigRocketMq",
112112
"ReceiverConfigActiveMq",
113+
"ReceiverConfigHdfs",
113114
"ReceiverConfigSftp"};
114115
for (String className : classNameS) {
115116
MenuItem menuAdd = new MenuItem(className);
@@ -237,6 +238,7 @@ public void changed(ObservableValue<? extends String> observable, String oldValu
237238
"SenderConfigRabbitMq",
238239
"SenderConfigRocketMq",
239240
"SenderConfigActiveMq",
241+
"SenderConfigHdfs",
240242
"SenderConfigSftp"
241243
};
242244
for (String className : classNameS) {

src/main/java/com/xwintop/xTransfer/common/model/LOGVALUES.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ public class LOGVALUES {
55
public static String EVENT_MSG_SENDED = "EVENT.MSG.SENDED";
66
public static String EVENT_MSG_FILTER = "EVENT.MSG.FILTER";
77
public static String RCV_TYPE_FS = "fsReceiver";
8+
public static String RCV_TYPE_HDFS = "hdfsReceiver";
89
public static String RCV_TYPE_MQ = "mqReceiver";
910
public static String RCV_TYPE_JMS = "jmsReceiver";
1011
public static String RCV_TYPE_FTP = "ftpReceiver";
1112
public static String RCV_TYPE_SFTP = "sftpReceiver";
1213
public static String CHANNEL_TYPE_FS = "FS";
14+
public static String CHANNEL_TYPE_HDFS = "HDFS";
1315
public static String CHANNEL_TYPE_MQ = "MQ";
1416
public static String CHANNEL_TYPE_JMS = "JMS";
1517
public static String CHANNEL_TYPE_FTP = "FTP";
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.xwintop.xTransfer.receiver.bean;
2+
3+
import lombok.Data;
4+
5+
import java.util.HashMap;
6+
import java.util.Map;
7+
8+
/**
9+
* @ClassName: ReceiverConfigFs
10+
* @Description: Hdfs接收器配置
11+
* @author: xufeng
12+
* @date: 2019/7/5 16:21
13+
*/
14+
15+
@Data
16+
public class ReceiverConfigHdfs implements ReceiverConfig {
17+
private String serviceName = "receiverHdfs";//对应服务名称
18+
private String id;//如果留空则系统自动分配
19+
/** 是否开启 */
20+
private boolean enable = true;
21+
/** 是否异步执行 */
22+
private boolean async = false;
23+
/** 是否发生异常时退出任务 */
24+
private boolean exceptionExit = false;
25+
26+
private String hdfsUrl = "hdfs://localhost:9000";//hdfs服务地址
27+
/** 接收目录 */
28+
private String pathIn;
29+
/** 一次最大处理数 */
30+
private int max = 100;
31+
/** 文件最大大小 */
32+
private long maxSize = 4194304;
33+
/** 编码 */
34+
private String encoding = "utf-8";
35+
/** 是否删除原文件 */
36+
private boolean delReceiveFile = true;
37+
/** 文件名过滤正则表达式 */
38+
private String fileNameRegex;
39+
/** 是否扫描子目录 */
40+
private boolean includeSubdirectory = false;
41+
/** 延时过滤时间,单位为毫秒 */
42+
private long delayTime = 0;
43+
/** 过滤最小文件 */
44+
private long minSize;
45+
46+
private Map args = new HashMap();//自定义参数
47+
}
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
package com.xwintop.xTransfer.receiver.service.impl;
2+
3+
import com.xwintop.xTransfer.common.MsgLogger;
4+
import com.xwintop.xTransfer.common.model.LOGKEYS;
5+
import com.xwintop.xTransfer.common.model.LOGVALUES;
6+
import com.xwintop.xTransfer.common.model.Msg;
7+
import com.xwintop.xTransfer.messaging.*;
8+
import com.xwintop.xTransfer.receiver.bean.ReceiverConfig;
9+
import com.xwintop.xTransfer.receiver.bean.ReceiverConfigHdfs;
10+
import com.xwintop.xTransfer.receiver.service.Receiver;
11+
import com.xwintop.xTransfer.task.quartz.TaskQuartzJob;
12+
import lombok.extern.slf4j.Slf4j;
13+
import org.apache.commons.collections.MapUtils;
14+
import org.apache.commons.lang3.ArrayUtils;
15+
import org.apache.commons.lang3.StringUtils;
16+
import org.apache.hadoop.conf.Configuration;
17+
import org.apache.hadoop.fs.*;
18+
import org.springframework.context.annotation.Scope;
19+
import org.springframework.stereotype.Service;
20+
21+
import java.io.ByteArrayOutputStream;
22+
import java.nio.file.NoSuchFileException;
23+
import java.util.ArrayList;
24+
import java.util.Iterator;
25+
import java.util.List;
26+
import java.util.Map;
27+
28+
/**
29+
* @ClassName: ReceiverFsImpl
30+
* @Description: Fs接收器
31+
* @author: xufeng
32+
* @date: 2018/6/13 16:11
33+
*/
34+
35+
@Service("receiverFs")
36+
@Scope("prototype")
37+
@Slf4j
38+
public class ReceiverHdfsImpl implements Receiver {
39+
private ReceiverConfigHdfs receiverConfigHdfs;
40+
private MessageHandler messageHandler;
41+
42+
@Override
43+
public void receive(Map params) throws Exception {
44+
receiveInternalByFiles(receiverConfigHdfs.getPathIn(), params, 0);
45+
}
46+
47+
private void receiveInternalByFiles(String pathIn, Map params, int receivedFileSum) throws Exception {
48+
pathIn = StringUtils.appendIfMissing(pathIn, "/", "/", "\\");
49+
Configuration conf = new Configuration();
50+
conf.set("fs.defaultFS", receiverConfigHdfs.getHdfsUrl());
51+
FileSystem hdfs = FileSystem.get(conf);
52+
Path pathin = new Path(pathIn);
53+
if (!hdfs.exists(pathin) || !hdfs.isDirectory(pathin)) {
54+
log.error("path: " + pathIn + "not exist or not a dirctory!");
55+
return;
56+
}
57+
RemoteIterator<FileStatus> pathIterator = hdfs.listStatusIterator(pathin);
58+
List<FileStatus> filePaths = new ArrayList<>();
59+
try {
60+
while (pathIterator.hasNext()) {
61+
if (receiverConfigHdfs.getMax() >= 0 && receiverConfigHdfs.getMax() <= receivedFileSum) {
62+
break;
63+
}
64+
FileStatus curPath = pathIterator.next();
65+
Path curFile = curPath.getPath();
66+
if (curPath.isDirectory()) {
67+
String curFileName = curFile.getName();
68+
if (StringUtils.isNotBlank(receiverConfigHdfs.getFileNameRegex())) {
69+
if (!curFileName.matches(receiverConfigHdfs.getFileNameRegex())) {
70+
continue;
71+
}
72+
}
73+
if (receiverConfigHdfs.getMaxSize() != -1 && curPath.getLen() > receiverConfigHdfs.getMaxSize()) {
74+
log.warn(curFile.toString() + ": size is exceed the limit [" + receiverConfigHdfs.getMaxSize() + "]. fileSize:" + curPath.getLen());
75+
continue;
76+
}
77+
if (receiverConfigHdfs.getDelayTime() > 0) {
78+
if (System.currentTimeMillis() - curPath.getModificationTime() < receiverConfigHdfs.getDelayTime()) {
79+
log.info("File is more new than currentTime-" + receiverConfigHdfs.getDelayTime());
80+
continue;
81+
}
82+
}
83+
if (receiverConfigHdfs.getMinSize() != -1) {
84+
if (curPath.getLen() < receiverConfigHdfs.getMinSize()) {
85+
log.info("File's Size is too small then skip it,file name is:" + curFile.getName() + " fileSize:" + curPath.getLen());
86+
continue;
87+
}
88+
}
89+
IMessage msg = new DefaultMessage();
90+
String uuid = msg.getId();
91+
log.info("begin receiving file:" + curFileName + "size=" + curPath.getLen() + ",id:" + uuid);
92+
if (!hdfs.exists(curFile)) {
93+
log.warn("curFile is not exist! file:" + curFile.toString());
94+
continue;
95+
}
96+
try {
97+
try {
98+
FSDataInputStream is = hdfs.open(curFile);
99+
ByteArrayOutputStream fout = new ByteArrayOutputStream();
100+
byte[] b = new byte[2048];
101+
int len = 0;
102+
while ((len = is.read(b)) != -1) {
103+
fout.write(b, 0, len);
104+
}
105+
msg.setRawData(fout.toByteArray());
106+
} catch (NoSuchFileException n) {
107+
log.warn("read optFile is not exist! file:" + curFile.toString());
108+
continue;
109+
}
110+
msg.setFileName(curFileName);
111+
msg.setEncoding(receiverConfigHdfs.getEncoding());
112+
msg.setProperty(LOGKEYS.CHANNEL_IN_TYPE, LOGVALUES.CHANNEL_TYPE_HDFS);
113+
msg.setProperty(LOGKEYS.CHANNEL_IN, pathIn);
114+
msg.setProperty(LOGKEYS.RECEIVER_TYPE, LOGVALUES.RCV_TYPE_FS);
115+
msg.setProperty(LOGKEYS.RECEIVER_ID, this.receiverConfigHdfs.getId());
116+
if (MapUtils.isNotEmpty(receiverConfigHdfs.getArgs())) {
117+
msg.getProperties().putAll(receiverConfigHdfs.getArgs());
118+
}
119+
IContext ctx = new DefaultContext();
120+
ctx.setMessage(msg);
121+
122+
Msg msgLogInfo = new Msg(LOGVALUES.EVENT_MSG_RECEIVED, uuid, null);
123+
msgLogInfo.put(LOGKEYS.CHANNEL_IN_TYPE, LOGVALUES.CHANNEL_TYPE_HDFS);
124+
msgLogInfo.put(LOGKEYS.CHANNEL_IN, pathIn);
125+
msgLogInfo.put(LOGKEYS.MSG_TAG, curFileName);
126+
msgLogInfo.put(LOGKEYS.MSG_LENGTH, ArrayUtils.getLength(msg.getMessage()));
127+
msgLogInfo.put(LOGKEYS.JOB_ID, params.get(TaskQuartzJob.JOBID));
128+
msgLogInfo.put(LOGKEYS.JOB_SEQ, params.get(TaskQuartzJob.JOBSEQ));
129+
msgLogInfo.put(LOGKEYS.RECEIVER_TYPE, LOGVALUES.RCV_TYPE_HDFS);
130+
msgLogInfo.put(LOGKEYS.RECEIVER_ID, this.receiverConfigHdfs.getId());
131+
MsgLogger.info(msgLogInfo.toMap());
132+
133+
messageHandler.handle(ctx);
134+
} catch (Exception e) {
135+
log.error("ReceiverFs 读取文件失败:", e);
136+
}
137+
if (receiverConfigHdfs.isDelReceiveFile()) {
138+
hdfs.deleteOnExit(curFile);
139+
}
140+
log.info("received file:" + curFileName);
141+
receivedFileSum++;
142+
} else {
143+
if (receiverConfigHdfs.isIncludeSubdirectory() && curPath.isDirectory() && receivedFileSum < receiverConfigHdfs.getMax()) {
144+
filePaths.add(curPath);
145+
}
146+
}
147+
}
148+
} finally {
149+
if (hdfs != null) {
150+
hdfs.close();
151+
}
152+
}
153+
//is directory then do
154+
if (receiverConfigHdfs.isIncludeSubdirectory() && receivedFileSum < receiverConfigHdfs.getMax()) {
155+
for (Iterator<FileStatus> it = filePaths.iterator(); it.hasNext(); ) {
156+
String curPathIn = it.next().getPath().toUri().getPath();
157+
receiveInternalByFiles(curPathIn, params, receivedFileSum);
158+
}
159+
}
160+
}
161+
162+
@Override
163+
public void setReceiverConfig(ReceiverConfig receiverConfig) {
164+
this.receiverConfigHdfs = (ReceiverConfigHdfs) receiverConfig;
165+
}
166+
167+
@Override
168+
public void setMessageHandler(MessageHandler messageHandler) {
169+
this.messageHandler = messageHandler;
170+
}
171+
172+
@Override
173+
public void destroy() {
174+
175+
}
176+
177+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.xwintop.xTransfer.sender.bean;
2+
3+
import lombok.Data;
4+
5+
/**
6+
* @ClassName: SenderConfigHdfs
7+
* @Description: Hdfs发送配置
8+
* @author: xufeng
9+
* @date: 2019/7/5 17:23
10+
*/
11+
@Data
12+
public class SenderConfigHdfs implements SenderConfig {
13+
private String serviceName = "senderHdfs";//对应服务名称
14+
private String id;//如果留空则系统自动分配
15+
private boolean enable = true;//是否开启
16+
private boolean async = false;//是否异步执行
17+
/** 是否发生异常时退出任务 */
18+
private boolean exceptionExit = true;
19+
private String fileNameFilterRegex;//文件名过滤正则表达式
20+
21+
private String hdfsUrl = "hdfs://localhost:9000";//hdfs服务地址
22+
/** 发送目录 */
23+
private String path;
24+
private String fileName;//文件名,支持变量替换。
25+
/** 是否创建目录 */
26+
private boolean createPathFlag = false;
27+
/** 是否覆盖重名文件 */
28+
private boolean overload = false;
29+
}

0 commit comments

Comments
 (0)