11from typing import Union
22from pathlib import Path
33from zipfile import ZipFile
4- import asyncio
4+ import io
5+ import re
56import subprocess
67import tempfile
78import shutil
9+ import mimetypes
10+ import itertools
811import PIL
912from PIL import Image
1013try :
1114 from jxlpy import JXLImagePlugin
1215except ModuleNotFoundError :
1316 pass
1417
18+ import logging
19+ logger = logging .getLogger (__name__ )
20+
1521from .config import settings
1622
23+
24+ mimetypes .add_type ('image/jxl' , '.jxl' )
25+ def is_image (p : Union [str , Path ]):
26+ mime = mimetypes .guess_type (p , strict = False )[0 ]
27+ return False if mime is None else mime .partition ('/' )[0 ] == 'image'
28+
1729def convert_image (f_or_path , saveto : str , thumbnail = False ):
1830 try :
1931 with Image .open (f_or_path ) as im :
@@ -40,10 +52,10 @@ def extract_thumbnail(path: Union[str, Path], id: str, page: int, cache=False, c
4052 return saveto .relative_to (settings .thumb )
4153 saveto .parent .mkdir (parents = True , exist_ok = True )
4254 if path .is_dir ():
43- convert_image (sorted (filter (lambda p : p . suffix != '.txt' and not p . name . startswith ( '.' ) , path .iterdir ()))[page - 1 ], saveto , thumbnail = True )
44- elif path . suffix == '.zip' :
45- with ZipFile (path ) as z :
46- with z .open (list (filter (lambda z_info : not z_info .is_dir (), z .infolist ()))[page - 1 ].filename ) as f :
55+ convert_image (sorted (filter (is_image , path .iterdir ()))[page - 1 ], saveto , thumbnail = True )
56+ elif ArchiveFile . support_formats . fullmatch ( path . name ) :
57+ with ArchiveFile (path ) as z :
58+ with z .open (list (filter (lambda z_info : not z_info .is_dir () and is_image ( z_info . filename ) , z .infolist ()))[page - 1 ].filename ) as f :
4759 convert_image (f , saveto , thumbnail = True )
4860 else :
4961 raise NotImplementedError
@@ -52,4 +64,75 @@ def extract_thumbnail(path: Union[str, Path], id: str, page: int, cache=False, c
5264 cover_path .parent .mkdir (parents = True , exist_ok = True )
5365 shutil .copy2 (saveto , cover_path )
5466 return cover_path .name
55- return saveto .relative_to (settings .thumb )
67+ return saveto .relative_to (settings .thumb )
68+
69+ class ArchiveInfo :
70+ def __init__ (self , filename ):
71+ self .filename = filename
72+
73+ def is_dir (self ):
74+ return self ._is_dir
75+
76+ class ArchiveFile :
77+ support_formats = re .compile ('.+\.(zip|rar|7z)$' , re .IGNORECASE )
78+ executable = None
79+
80+ def __init__ (self , file ):
81+ self .file = Path (file )
82+ if self .file .suffix .lower () == '.zip' :
83+ self .zipfile = ZipFile (file )
84+ return
85+ else :
86+ self .zipfile = None
87+ self ._infolist = None
88+ if self .executable is None :
89+ sevenzip = ('7zzs' , '7zz' , '7z' , '7za' , '7zr' )
90+ for executable in itertools .chain (('./' + _s for _s in sevenzip ), sevenzip ):
91+ try :
92+ p = subprocess .run ([executable , 'i' ], capture_output = True )
93+ if p .returncode == 0 :
94+ self .executable = executable
95+ if b'Rar' not in p .stdout :
96+ logger .warning ("Your version of 7-Zip does not support rar files, please download the correct version from https://7-zip.org/download.html." )
97+ except FileNotFoundError :
98+ continue
99+ if self .executable is None :
100+ raise FileNotFoundError ("We encountered a non-zip archive, this requires 7-Zip, but you do not have it installed, please download from https://7-zip.org/download.html and make sure 7zzs or 7zz or 7z is in the working directory or the directory indicated by PATH." )
101+ logger .debug (f"Use 7-Zip { self .executable } " )
102+
103+ def infolist (self ):
104+ if not self ._infolist is None :
105+ return self ._infolist
106+ self ._infolist = []
107+ stdout = subprocess .run ([self .executable , 'l' , str (self .file )], check = True , capture_output = True , text = True ).stdout
108+ list_start = False
109+ line_sep = '------------------- ----- ------------ ------------ ------------------------'
110+ for line in stdout .splitlines ():
111+ if list_start :
112+ if line == line_sep :
113+ break
114+ m = re .match (r'\S+ \S+ ([\.D])\S{4} +\d+ +\d* (.+)' , line )
115+ if m is None : raise NotImplementedError
116+ archive_info = ArchiveInfo (m [2 ])
117+ archive_info ._is_dir = m [1 ] == 'D'
118+ self ._infolist .append (archive_info )
119+ if line == line_sep :
120+ list_start = True
121+ return self ._infolist
122+
123+ def namelist (self ):
124+ return (archive_info .filename for archive_info in self .infolist ())
125+
126+ def open (self , name ):
127+ p = subprocess .run ([self .executable , 'e' , '-so' , str (self .file ), name ], check = True , capture_output = True )
128+ return io .BytesIO (p .stdout )
129+
130+ def __enter__ (self ):
131+ if self .zipfile is None :
132+ return self
133+ else :
134+ return self .zipfile .__enter__ ()
135+
136+ def __exit__ (self , * args ):
137+ if not self .zipfile is None :
138+ self .zipfile .__exit__ (* args )
0 commit comments