Skip to content

feat(drivers/weiyun_open): support weiyun official mcp api#2305

Draft
yokinanya wants to merge 3 commits into
OpenListTeam:mainfrom
yokinanya:codex/add-weiyun-open-driver
Draft

feat(drivers/weiyun_open): support weiyun official mcp api#2305
yokinanya wants to merge 3 commits into
OpenListTeam:mainfrom
yokinanya:codex/add-weiyun-open-driver

Conversation

@yokinanya
Copy link
Copy Markdown

@yokinanya yokinanya commented Apr 3, 2026

Description / 描述

  • 新增 WeiYun Open 存储驱动,基于微云官方 MCP 接口实现

暂无权限添加标签,需要管理员处理

Motivation and Context / 背景

使用官方MCP接口,可以避免cookie过期的问题

How Has This Been Tested? / 测试

已执行以下测试:

  • go test ./drivers/weiyun_open
  • go test ./drivers

Checklist / 检查清单

  • I have read the CONTRIBUTING document.
    我已阅读 CONTRIBUTING 文档。
  • I have formatted my code with go fmt or prettier.
    我已使用 go fmtprettier 格式化提交的代码。
  • I have added appropriate labels to this PR (or mentioned needed labels in the description if lacking permissions).
    我已为此 PR 添加了适当的标签(如无权限或需要的标签不存在,请在描述中说明,管理员将后续处理)。
  • I have requested review from relevant code authors using the "Request review" feature when applicable.
    我已在适当情况下使用"Request review"功能请求相关代码作者进行审查。
  • I have updated the repository accordingly (If it’s needed).
    我已相应更新了相关仓库(若适用)。

@j2rong4cn
Copy link
Copy Markdown
Member

分片上传参考可以这个,stream.NewStreamSectionReader + errgroup.NewOrderedGroupWithContext

func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createResp *UploadCreateResp, up driver.UpdateProgress) error {
uploadDomain := createResp.Data.Servers[0]
size := file.GetSize()
chunkSize := createResp.Data.SliceSize
ss, err := stream.NewStreamSectionReader(file, int(chunkSize), &up)
if err != nil {
return err
}
uploadNums := (size + chunkSize - 1) / chunkSize
thread := min(int(uploadNums), d.UploadThread)
threadG, uploadCtx := errgroup.NewOrderedGroupWithContext(ctx, thread,
retry.Attempts(3),
retry.Delay(time.Second),
retry.DelayType(retry.BackOffDelay))
for partIndex := range uploadNums {
if utils.IsCanceled(uploadCtx) {
break
}
partIndex := partIndex
partNumber := partIndex + 1 // 分片号从1开始
offset := partIndex * chunkSize
size := min(chunkSize, size-offset)
var reader io.ReadSeeker
var rateLimitedRd io.Reader
sliceMD5 := ""
// 表单
b := bytes.NewBuffer(make([]byte, 0, 2048))
threadG.GoWithLifecycle(errgroup.Lifecycle{
Before: func(ctx context.Context) (err error) {
reader, err = ss.GetSectionReader(offset, size)
return
},
Do: func(ctx context.Context) (err error) {
reader.Seek(0, io.SeekStart)
if sliceMD5 == "" {
// 把耗时的计算放在这里,避免阻塞其他协程
sliceMD5, err = utils.HashReader(utils.MD5, reader)
if err != nil {
return err
}
reader.Seek(0, io.SeekStart)
}
b.Reset()
w := multipart.NewWriter(b)
// 添加表单字段
err = w.WriteField("preuploadID", createResp.Data.PreuploadID)
if err != nil {
return err
}
err = w.WriteField("sliceNo", strconv.FormatInt(partNumber, 10))
if err != nil {
return err
}
err = w.WriteField("sliceMD5", sliceMD5)
if err != nil {
return err
}
// 写入文件内容
_, err = w.CreateFormFile("slice", fmt.Sprintf("%s.part%d", file.GetName(), partNumber))
if err != nil {
return err
}
headSize := b.Len()
err = w.Close()
if err != nil {
return err
}
head := bytes.NewReader(b.Bytes()[:headSize])
tail := bytes.NewReader(b.Bytes()[headSize:])
rateLimitedRd = driver.NewLimitedUploadStream(ctx, io.MultiReader(head, reader, tail))
token, err := d.getAccessToken(false)
if err != nil {
return err
}
// 创建请求并设置header
req, err := http.NewRequestWithContext(ctx, http.MethodPost, uploadDomain+"/upload/v2/file/slice", rateLimitedRd)
if err != nil {
return err
}
// 设置请求头
req.Header.Add("Authorization", "Bearer "+token)
req.Header.Add("Content-Type", w.FormDataContentType())
req.Header.Add("Platform", "open_platform")
res, err := base.HttpClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != 200 {
return fmt.Errorf("slice %d upload failed, status code: %d", partNumber, res.StatusCode)
}
b.Reset()
_, err = b.ReadFrom(res.Body)
if err != nil {
return err
}
var resp BaseResp
err = json.Unmarshal(b.Bytes(), &resp)
if err != nil {
return err
}
if resp.Code != 0 {
return fmt.Errorf("slice %d upload failed: %s", partNumber, resp.Message)
}
progress := 100 * float64(threadG.Success()+1) / float64(uploadNums+1)
up(progress)
return nil
},
After: func(err error) {
ss.FreeSectionReader(reader)
},
})
}
if err := threadG.Wait(); err != nil {
return err
}
return nil
}

@yokinanya
Copy link
Copy Markdown
Author

分片上传参考可以这个,stream.NewStreamSectionReader + errgroup.NewOrderedGroupWithContext

func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createResp *UploadCreateResp, up driver.UpdateProgress) error {
uploadDomain := createResp.Data.Servers[0]
size := file.GetSize()
chunkSize := createResp.Data.SliceSize
ss, err := stream.NewStreamSectionReader(file, int(chunkSize), &up)
if err != nil {
return err
}
uploadNums := (size + chunkSize - 1) / chunkSize
thread := min(int(uploadNums), d.UploadThread)
threadG, uploadCtx := errgroup.NewOrderedGroupWithContext(ctx, thread,
retry.Attempts(3),
retry.Delay(time.Second),
retry.DelayType(retry.BackOffDelay))
for partIndex := range uploadNums {
if utils.IsCanceled(uploadCtx) {
break
}
partIndex := partIndex
partNumber := partIndex + 1 // 分片号从1开始
offset := partIndex * chunkSize
size := min(chunkSize, size-offset)
var reader io.ReadSeeker
var rateLimitedRd io.Reader
sliceMD5 := ""
// 表单
b := bytes.NewBuffer(make([]byte, 0, 2048))
threadG.GoWithLifecycle(errgroup.Lifecycle{
Before: func(ctx context.Context) (err error) {
reader, err = ss.GetSectionReader(offset, size)
return
},
Do: func(ctx context.Context) (err error) {
reader.Seek(0, io.SeekStart)
if sliceMD5 == "" {
// 把耗时的计算放在这里,避免阻塞其他协程
sliceMD5, err = utils.HashReader(utils.MD5, reader)
if err != nil {
return err
}
reader.Seek(0, io.SeekStart)
}
b.Reset()
w := multipart.NewWriter(b)
// 添加表单字段
err = w.WriteField("preuploadID", createResp.Data.PreuploadID)
if err != nil {
return err
}
err = w.WriteField("sliceNo", strconv.FormatInt(partNumber, 10))
if err != nil {
return err
}
err = w.WriteField("sliceMD5", sliceMD5)
if err != nil {
return err
}
// 写入文件内容
_, err = w.CreateFormFile("slice", fmt.Sprintf("%s.part%d", file.GetName(), partNumber))
if err != nil {
return err
}
headSize := b.Len()
err = w.Close()
if err != nil {
return err
}
head := bytes.NewReader(b.Bytes()[:headSize])
tail := bytes.NewReader(b.Bytes()[headSize:])
rateLimitedRd = driver.NewLimitedUploadStream(ctx, io.MultiReader(head, reader, tail))
token, err := d.getAccessToken(false)
if err != nil {
return err
}
// 创建请求并设置header
req, err := http.NewRequestWithContext(ctx, http.MethodPost, uploadDomain+"/upload/v2/file/slice", rateLimitedRd)
if err != nil {
return err
}
// 设置请求头
req.Header.Add("Authorization", "Bearer "+token)
req.Header.Add("Content-Type", w.FormDataContentType())
req.Header.Add("Platform", "open_platform")
res, err := base.HttpClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != 200 {
return fmt.Errorf("slice %d upload failed, status code: %d", partNumber, res.StatusCode)
}
b.Reset()
_, err = b.ReadFrom(res.Body)
if err != nil {
return err
}
var resp BaseResp
err = json.Unmarshal(b.Bytes(), &resp)
if err != nil {
return err
}
if resp.Code != 0 {
return fmt.Errorf("slice %d upload failed: %s", partNumber, resp.Message)
}
progress := 100 * float64(threadG.Success()+1) / float64(uploadNums+1)
up(progress)
return nil
},
After: func(err error) {
ss.FreeSectionReader(reader)
},
})
}
if err := threadG.Wait(); err != nil {
return err
}
return nil
}

改好了

@j2rong4cn j2rong4cn marked this pull request as draft May 12, 2026 17:14
@j2rong4cn j2rong4cn marked this pull request as draft May 12, 2026 17:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants