77import com .xwintop .xTransfer .messaging .IMessage ;
88import lombok .Data ;
99import lombok .extern .slf4j .Slf4j ;
10+ import org .apache .commons .compress .archivers .ArchiveEntry ;
11+ import org .apache .commons .compress .archivers .ArchiveOutputStream ;
12+ import org .apache .commons .compress .archivers .ArchiveStreamFactory ;
13+ import org .apache .commons .compress .archivers .ar .ArArchiveEntry ;
14+ import org .apache .commons .compress .archivers .ar .ArArchiveOutputStream ;
15+ import org .apache .commons .compress .archivers .cpio .CpioArchiveEntry ;
16+ import org .apache .commons .compress .archivers .cpio .CpioArchiveOutputStream ;
17+ import org .apache .commons .compress .archivers .sevenz .SevenZArchiveEntry ;
18+ import org .apache .commons .compress .archivers .sevenz .SevenZOutputFile ;
19+ import org .apache .commons .compress .archivers .tar .TarArchiveEntry ;
20+ import org .apache .commons .compress .archivers .tar .TarArchiveOutputStream ;
21+ import org .apache .commons .compress .compressors .CompressorOutputStream ;
22+ import org .apache .commons .compress .compressors .CompressorStreamFactory ;
23+ import org .apache .commons .compress .compressors .gzip .GzipCompressorOutputStream ;
24+ import org .apache .commons .compress .compressors .gzip .GzipParameters ;
25+ import org .apache .commons .compress .utils .SeekableInMemoryByteChannel ;
1026import org .apache .commons .lang3 .StringUtils ;
1127import org .springframework .context .annotation .Scope ;
1228import org .springframework .stereotype .Service ;
1329
1430import java .io .ByteArrayOutputStream ;
15- import java .util . Iterator ;
31+ import java .io . File ;
1632import java .util .List ;
1733import java .util .Map ;
18- import java .util .zip .GZIPOutputStream ;
19- import java .util .zip .ZipEntry ;
20- import java .util .zip .ZipOutputStream ;
2134
2235/**
2336 * @ClassName: FilterCompressImpl
@@ -35,71 +48,102 @@ public class FilterCompressImpl implements Filter {
3548
3649 @ Override
3750 public void doFilter (IContext ctx , Map params ) throws Exception {
38- String zipType = null ;
3951 IMessage msg = ctx .getMessages ().get (0 );
40- Map args = filterConfigCompress .getArgs ();
41- log .debug ("执行了压缩动作" );
42- if (msg != null ) {
43- zipType = msg .getProperties ().getProperty ("COMPRESS_METHOD" );
44- if (StringUtils .isBlank (zipType )) {
45- zipType = StringUtils .defaultIfBlank (filterConfigCompress .getMethod (), "zip" );
52+ if (StringUtils .isNotBlank (filterConfigCompress .getFileNameFilterRegex ())) {
53+ String fileNameFilterRegexGroup = filterConfigCompress .getFileNameFilterRegexGroup ();
54+ if (StringUtils .isEmpty (fileNameFilterRegexGroup )) {
55+ fileNameFilterRegexGroup = "defaultRegexGroup" ;
56+ }
57+ if ("?!" .equals (filterConfigCompress .getFileNameFilterRegex ())) {
58+ if (msg .checkFileNameFilterRegexGroup (fileNameFilterRegexGroup )) {
59+ log .info ("Filter:" + filterConfigCompress .getId () + "跳过fileName:" + msg .getFileName ());
60+ return ;
61+ }
62+ } else {
63+ if (!msg .getFileName ().matches (filterConfigCompress .getFileNameFilterRegex ())) {
64+ log .info ("Filter:" + filterConfigCompress .getId () + "跳过fileName:" + msg .getFileName ());
65+ return ;
66+ }
67+ msg .addFileNameFilterRegexGroup (fileNameFilterRegexGroup );
4668 }
4769 }
70+ // Map args = filterConfigCompress.getArgs();
71+ String zipType = msg .getProperties ().getProperty ("COMPRESS_METHOD" );
4872 if (StringUtils .isBlank (zipType )) {
49- zipType = "zip" ;
73+ zipType = StringUtils . defaultIfBlank ( filterConfigCompress . getMethod (), "zip" ) ;
5074 }
51- log . debug ( "begin unzip data,zip type is:" + zipType ) ;
52- byte [] zippedData ;
53- if ( " zip" . equalsIgnoreCase ( zipType )) {
54- List rawList = ctx . getMessages ();
55- zippedData = this .zip ( rawList );
75+ byte [] zippedData = null ;
76+ if ( new ArchiveStreamFactory (). getOutputStreamArchiveNames (). contains ( zipType . toLowerCase ())) {
77+ zippedData = this . zip ( ctx . getMessages (), zipType );
78+ } else if ( CompressorStreamFactory . getSingleton (). getOutputStreamCompressorNames (). contains ( zipType . toLowerCase ())) {
79+ zippedData = this .gzip ( msg , zipType );
5680 } else {
57- zippedData = this .gzip (msg );
81+ log .warn ("未支持该压缩格式:" + zipType + " Filter:" + filterConfigCompress .getId () + " fileName:" + msg .getFileName ());
82+ return ;
5883 }
5984 msg .getProperties ().put ("COMPRESS_METHOD" , zipType );
6085 if (filterConfigCompress .isAddPostfixName ()) {
86+ msg .getProperties ().put ("ZIP_FILE_NAME" , msg .getFileName ());
6187 msg .setFileName (msg .getFileName () + "." + zipType );
6288 }
63- log .debug ( " success zip data." );
89+ log .info ( "Filter:" + filterConfigCompress . getId () + " success zip data." + msg . getFileName () );
6490 msg .setMessage (zippedData );
6591 ctx .setMessage (msg );
6692 }
6793
68- private byte [] zip (List list ) throws Exception {
69- ByteArrayOutputStream baos = new ByteArrayOutputStream ();
70- if (list != null && list .size () > 0 ) {
71- ZipOutputStream zout = new ZipOutputStream (baos );
72- for (Iterator it = list .iterator (); it .hasNext (); ) {
73- IMessage msg = (IMessage ) it .next ();
74- this .zip (msg , zout );
94+ private byte [] zip (List <IMessage > list , String zipType ) throws Exception {
95+ if (ArchiveStreamFactory .SEVEN_Z .equalsIgnoreCase (zipType )) {
96+ SeekableInMemoryByteChannel seekableByteChannel = new SeekableInMemoryByteChannel ();
97+ SevenZOutputFile sevenZOutput = new SevenZOutputFile (seekableByteChannel );
98+ for (IMessage msg : list ) {
99+ SevenZArchiveEntry entry = new SevenZArchiveEntry ();
100+ entry .setName (msg .getFileName ());
101+ sevenZOutput .putArchiveEntry (entry );
102+ sevenZOutput .write (msg .getMessage ());
103+ sevenZOutput .closeArchiveEntry ();
75104 }
76- zout .close ();
77- zout = null ;
78- } else {
79- log .warn ("no message for compress..." );
105+ sevenZOutput .close ();
106+ return seekableByteChannel .array ();
80107 }
81- byte zippedData [] = baos .toByteArray ();
82- baos .close ();
83- baos = null ;
84- return zippedData ;
85- }
86-
87-
88- private void zip (IMessage msg , ZipOutputStream zout ) throws Exception {
89- zout .putNextEntry (new ZipEntry (msg .getFileName ()));
90- zout .write (msg .getMessage ());
108+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream ();
109+ ArchiveOutputStream archiveOutputStream = new ArchiveStreamFactory ().createArchiveOutputStream (zipType , outputStream );
110+ for (IMessage msg : list ) {
111+ if (archiveOutputStream instanceof ArArchiveOutputStream ) {
112+ ((ArArchiveOutputStream ) archiveOutputStream ).setLongFileMode (ArArchiveOutputStream .LONGFILE_BSD );
113+ }
114+ ArchiveEntry entry = null ;
115+ if (archiveOutputStream instanceof ArArchiveOutputStream ) {
116+ entry = new ArArchiveEntry (msg .getFileName (), msg .getMessage ().length );
117+ } else if (archiveOutputStream instanceof TarArchiveOutputStream ) {
118+ entry = new TarArchiveEntry (msg .getFileName ());
119+ ((TarArchiveEntry ) entry ).setSize (msg .getMessage ().length );
120+ } else if (archiveOutputStream instanceof CpioArchiveOutputStream ) {
121+ entry = new CpioArchiveEntry (msg .getFileName ());
122+ ((CpioArchiveEntry ) entry ).setSize (msg .getMessage ().length );
123+ } else {
124+ entry = archiveOutputStream .createArchiveEntry (new File (msg .getFileName ()), msg .getFileName ());
125+ }
126+ archiveOutputStream .putArchiveEntry (entry );
127+ archiveOutputStream .write (msg .getMessage ());
128+ archiveOutputStream .closeArchiveEntry ();
129+ }
130+ archiveOutputStream .close ();
131+ return outputStream .toByteArray ();
91132 }
92133
93- private byte [] gzip (IMessage msg ) throws Exception {
94- ByteArrayOutputStream baos = new ByteArrayOutputStream ();
95- GZIPOutputStream gout = new GZIPOutputStream (baos );
96- gout .write (msg .getMessage ());
97- gout .close ();
98- gout = null ;
99- byte [] zipData = baos .toByteArray ();
100- baos .close ();
101- baos = null ;
102- return zipData ;
134+ private byte [] gzip (IMessage msg , String zipType ) throws Exception {
135+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream ();
136+ CompressorOutputStream compressorOutputStream ;
137+ if (CompressorStreamFactory .GZIP .equalsIgnoreCase (zipType )) {
138+ GzipParameters gzipParameters = new GzipParameters ();
139+ gzipParameters .setFilename (msg .getFileName ());
140+ compressorOutputStream = new GzipCompressorOutputStream (outputStream , gzipParameters );
141+ } else {
142+ compressorOutputStream = CompressorStreamFactory .getSingleton ().createCompressorOutputStream (zipType , outputStream );
143+ }
144+ compressorOutputStream .write (msg .getMessage ());
145+ compressorOutputStream .close ();
146+ return outputStream .toByteArray ();
103147 }
104148
105149 @ Override
0 commit comments