Browse Source

update

master
RealXLFD 1 month ago
parent
commit
584f24e797
  1. 0
      cliapps/dcom/exec.go
  2. 0
      cliapps/dcom/init.go
  3. 2
      cliapps/ffmpeg/options.go
  4. 26
      net/apps/mdown/basic.go
  5. 45
      net/apps/mdown/errors.go
  6. 32
      net/apps/mdown/range.go
  7. 218
      net/apps/mdown/target.go
  8. 5
      net/apps/mdown/thread.go
  9. 34
      net/apps/multithread_downloader/client_option.go
  10. 50
      net/apps/multithread_downloader/control.go
  11. 268
      net/apps/multithread_downloader/downloader.go
  12. 20
      net/apps/multithread_downloader/range.go
  13. 41
      net/apps/multithread_downloader/task_options.go
  14. 116
      net/apps/multithread_downloader/worker.go
  15. 21
      proc/state/index.go
  16. 29
      utils/speed/v2/speed.go
  17. 77
      utils/speed/v2/writer.go
  18. 36
      utils/speed/v2/writer_internal.go

0
utils/file/dcom/exec.go → cliapps/dcom/exec.go

0
utils/file/dcom/init.go → cliapps/dcom/init.go

2
cliapps/ffmpeg/options.go

@ -126,7 +126,7 @@ func (a AMFQuality) Option() []string {
}
const (
AMFQualitySpeed = `speed`
AMFQualitySpeed = `v2`
AMFQualityBalanced = `balanced`
AMFQualityQuality = `quality`
)

26
net/apps/mdown/basic.go

@ -1,26 +0,0 @@
package mdown
import (
"time"
"git.realxlfd.cc/RealXLFD/golib/cli/logger"
)
var (
log = logger.New()
)
// WAITING -> HEAD -> READY
const (
FAILED = iota - 1
WAITING
HEAD
READY
RUNNING
SUCCESS
)
var (
HeadRetryCount = 2
HeadRetryGap = 300 * time.Millisecond
)

45
net/apps/mdown/errors.go

@ -1,45 +0,0 @@
package mdown
import (
"errors"
"io"
"net/http"
"strconv"
"git.realxlfd.cc/RealXLFD/golib/utils/str"
)
type ErrRequestFailed struct {
StatusCode int
Msg string
}
func (e ErrRequestFailed) Error() string {
return str.Join(
"status: ", strconv.Itoa(e.StatusCode), " msg: ", e.Msg,
)
}
func requestFailed(resp *http.Response) error {
body, _ := io.ReadAll(resp.Body)
return ErrRequestFailed{
StatusCode: resp.StatusCode,
Msg: string(body),
}
}
type ErrNetworkErr error
type ErrFileOpenFailed error
func errorDetail(err error) string {
var errNetworkErr ErrNetworkErr
var errFileOpenFailed ErrFileOpenFailed
switch {
case errors.As(err, &errNetworkErr):
return str.Join("网络错误:", err.Error())
case errors.As(err, &errFileOpenFailed):
return str.Join("写入失败:", err.Error())
}
return err.Error()
}

32
net/apps/mdown/range.go

@ -1,32 +0,0 @@
package mdown
import (
"net/http"
"strconv"
"git.realxlfd.cc/RealXLFD/golib/utils/str"
)
func rangeGenerator(total, parts int) (ranges []http.Header) {
each := total / parts
for i := range parts - 1 {
ranges = append(
ranges, rangeHeader(i*each, (i+1)*each-1),
)
}
ranges = append(ranges, rangeHeader((parts-1)*each, total))
return ranges
}
func rangeHeader(start, end int) http.Header {
return http.Header{
"Range": []string{
str.Join(
"bytes=",
strconv.Itoa(start),
"-",
strconv.Itoa(end),
),
},
}
}

218
net/apps/mdown/target.go

@ -1,218 +0,0 @@
package mdown
import (
"errors"
"io"
"mime"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"git.realxlfd.cc/RealXLFD/golib/proc/state"
"git.realxlfd.cc/RealXLFD/golib/utils"
"git.realxlfd.cc/RealXLFD/golib/utils/ioplus"
"git.realxlfd.cc/RealXLFD/golib/utils/str"
)
type Target struct {
URL *url.URL
cookies http.Header
Path string
proxy *url.URL
state *state.Manager
client *http.Client
details struct {
Filename string
ContentSize int
AcceptRanges bool
}
}
func NewTarget(URL, Path string) *Target {
target, err := url.Parse(URL)
if err != nil {
panic("错误的URL")
}
stat, err := os.Stat(Path)
switch {
case err != nil:
err = os.MkdirAll(Path, os.ModePerm)
if err != nil {
panic("创建文件夹失败")
}
case !stat.IsDir():
panic("路径应是文件夹而不是文件")
}
return &Target{
URL: target,
Path: Path,
state: state.New(WAITING),
}
}
func (t *Target) JustDownload() error {
switch t.state.Get() {
case FAILED:
return errors.New("下载失败")
case SUCCESS:
return nil
default:
}
t.getClient() // TODO
file := filepath.Join(t.Path, t.details.Filename)
err := t.request(nil, file, nil)
if err != nil {
log.Error(err.Error())
}
}
// 注意!内部将修改Range
func (t *Target) request(
rangeHeader http.Header, file string, s *ioplus.SWrite,
) error {
req := &http.Request{
URL: t.URL,
Header: t.cookies,
}
if rangeHeader == nil {
utils.JoinMap(rangeHeader, t.cookies)
req.Header = rangeHeader
}
resp, err := t.client.Do(req)
if err != nil {
return ErrNetworkErr(err)
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
t.state.ToState(FAILED)
return requestFailed(resp)
}
// write to file
entry, err := os.OpenFile(
file, os.O_CREATE|os.O_WRONLY|os.O_APPEND, os.ModePerm,
) // TODO: 处理断点续传
if err != nil {
return ErrFileOpenFailed(err)
}
defer func(entry *os.File) {
_ = entry.Close()
}(entry)
writer := s.Bind(entry) // 将写入器绑定以便测量速度
_, err = io.Copy(writer, resp.Body)
if err != nil {
return ErrFileOpenFailed(err)
}
return nil
}
func (t *Target) getClient() {
t.client = &http.Client{}
if t.proxy != nil {
t.client.Transport = &http.Transport{
Proxy: http.ProxyURL(t.proxy),
}
}
}
func (t *Target) Proxy(proxy string) *Target {
proxyURL, err := url.Parse(proxy)
if err != nil {
panic("URL格式不正确:")
}
t.proxy = proxyURL
return t
}
func (t *Target) Cookies(cookies string) *Target {
t.cookies = http.Header{
"Cookie": []string{cookies},
}
return t
}
func (t *Target) Get() *Target {
if t.state.Is(HEAD, READY) {
return t
}
t.state.ToState(HEAD)
go func() {
err := t.head()
if err != nil {
log.Error(err.Error())
t.state.ToState(FAILED)
t.state.QuitAll()
return
}
t.state.ToState(READY)
}()
return t
}
func (t *Target) head() error {
query := &http.Request{
Method: "HEAD",
URL: t.URL,
Header: t.cookies,
}
resp, err := t.client.Do(query)
if err != nil {
return ErrNetworkErr(err)
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
return requestFailed(resp)
}
var ok bool
for i := range HeadRetryCount + 1 {
ok = t.fromHeadGetDetails(resp)
if ok {
goto SUCCESS
}
log.Error("获取文件信息失败,重试次数:", strconv.Itoa(i), t.URL.String())
}
return ErrRequestFailed{
StatusCode: resp.StatusCode,
Msg: str.Join("无法获取文件信息:", t.URL.String()),
}
SUCCESS:
return nil
}
func (t *Target) fromHeadGetDetails(resp *http.Response) (ok bool) {
contentLength := resp.Header.Get("Content-Length")
var err error
if t.details.ContentSize, err = strconv.Atoi(contentLength); contentLength == "" || err != nil {
return false
}
acceptRanges := resp.Header.Get("Accept-Ranges")
t.details.AcceptRanges = acceptRanges == "bytes"
disposition := resp.Header.Get("Content-Disposition")
_, params, err := mime.ParseMediaType(disposition)
if t.details.Filename, ok = params["filename"]; !ok {
contentType := resp.Header.Get("Content-Type")
base := filepath.Base(t.URL.String())
if strings.Contains(base, ".") {
t.details.Filename = base
} else {
var ext string
exts, _ := mime.ExtensionsByType(contentType)
if len(exts) != 0 {
ext = exts[1]
}
t.details.Filename = str.Join(base, ext)
}
}
return true
}
func (t *Target) Wait() {
}

5
net/apps/mdown/thread.go

@ -1,5 +0,0 @@
package mdown
type Thread struct {
Target *T
}

34
net/apps/multithread_downloader/client_option.go

@ -1,34 +0,0 @@
package mdown
import (
"net/http"
"net/url"
)
type Option func(*Client)
func WithTransportProxy(proxyURL *url.URL) Option {
return func(client *Client) {
client.Transport = &http.Transport{
Proxy: http.ProxyURL(proxyURL),
}
}
}
func WithMaxThread(count int) Option {
if count <= 0 {
panic("count must be greater than zero")
}
return func(client *Client) {
client.MaxThread = count
}
}
func WithMaxThreadPerReq(count int) Option {
if count <= 0 {
panic("count must be greater than zero")
}
return func(client *Client) {
client.MaxThreadPerURL = count
}
}

50
net/apps/multithread_downloader/control.go

@ -1,50 +0,0 @@
package mdown
import (
"net/http"
"time"
"git.realxlfd.cc/RealXLFD/golib/proc/thread/pool"
)
var (
TempFilesLocation = "./temp"
)
const (
DefaultMaxThread = 8 // 默认单文件最大线程数
RetryTimeGap = 500 * time.Millisecond // 重试间隔
)
type Client struct {
Transport *http.Transport
tasks []*Task
pool *pool.Pool[*Worker, struct{}]
MaxThread int
MaxThreadPerURL int
}
func NewClient(opts ...Option) *Client {
client := &Client{
nil,
make([]*Task, 0),
nil,
DefaultMaxThread,
DefaultMaxThread,
}
for _, opt := range opts {
opt(client)
}
return client
}
// Download 异步提交一个下载任务
// func (c *Client) Download(
// URL string, file string, header http.Header,
// ) (*Task, error) {
// targetURL, err := url.Parse(URL)
// if err != nil {
// return nil, err
// }
// return nil, err
// }

268
net/apps/multithread_downloader/downloader.go

@ -1,268 +0,0 @@
package mdown
import (
"errors"
"io"
"mime"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"git.realxlfd.cc/RealXLFD/golib/cli/logger"
"git.realxlfd.cc/RealXLFD/golib/utils/str"
)
func init() {
_ = mime.AddExtensionType(".zip", "application/zip")
_ = os.MkdirAll(TempFilesLocation, os.ModePerm)
}
type Task struct {
client *http.Client
TargetURL *url.URL
Path string
Settings *Settings
Details Details
State State
group *sync.WaitGroup
workers []*Worker
pool ThreadPool
}
type Settings struct {
TempDir string
Retry int
Headers http.Header
Condition func(*Details) int // 控制线程数量
}
type Details struct {
Filename string
ContentSize int
AcceptRanges bool
}
type State int
const (
FAILED State = iota - 1
WAITING
READY
DOWNLOADING
SUCCESS
)
var (
l = logger.Logger{}
)
func NewTask(URL string, path string, opts ...TaskOptions) (*Task, error) {
target, err := url.Parse(URL)
if err != nil {
return nil, err
}
stat, err := os.Stat(path)
if err != nil || !stat.IsDir() {
return nil, errors.New(str.Join("invalid path: ", path))
}
task := &Task{
client: &http.Client{},
TargetURL: target,
Path: path,
Settings: &Settings{},
Details: Details{},
State: WAITING,
group: &sync.WaitGroup{},
workers: make([]*Worker, 0),
pool: nil,
}
for i := range opts {
err = opts[i](task)
if err != nil {
return nil, err
}
}
task.group.Add(1)
go func() {
var err error
for range task.Settings.Retry + 1 {
err = task.head()
if err != nil {
l.Error(err.Error())
time.Sleep(RetryTimeGap)
continue
}
break
}
if err != nil {
task.State = FAILED
} else {
task.State = READY
}
task.group.Done()
}()
return task, nil
}
func (d *Task) Start() *Task {
switch d.State {
case READY:
break
case DOWNLOADING:
fallthrough
case SUCCESS:
return d
case WAITING:
d.group.Wait()
default:
l.Error("任务失败,Start()调用无效")
return d
}
threads := DefaultMaxThread
if d.Settings.Condition != nil {
threads = d.Settings.Condition(&d.Details)
}
d.spawn(threads)
return d
}
func (d *Task) spawn(threads int) {
var workers []*Worker
var ranges []requestRange
if d.Details.AcceptRanges || d.pool != nil {
ranges = rangeGenerator(d.Details.ContentSize, threads)
} else {
threads = 1
ranges = []requestRange{
{
0, d.Details.ContentSize,
},
}
}
for i := range threads {
w := &Worker{
d,
false,
ranges[i],
d.TargetURL,
filepath.Join(
d.Settings.TempDir,
str.Join(d.Details.Filename, ".part", strconv.Itoa(i)),
),
d.group,
nil,
}
workers = append(workers, w)
}
d.group.Add(len(workers))
if d.pool != nil {
d.pool.Push(workers...)
} else {
workers[0].Do()
}
d.workers = workers
d.State = DOWNLOADING
return
}
func (d *Task) head() error {
headQuery := &http.Request{
Method: "HEAD",
URL: d.TargetURL,
Header: d.Settings.Headers,
}
resp, err := d.client.Do(headQuery)
if err != nil {
return err
}
defer func() {
_ = resp.Body.Close()
}()
contentLength := resp.Header.Get("Content-Length")
if d.Details.ContentSize, err = strconv.Atoi(contentLength); contentLength == "" || err != nil {
return errors.New("无法获取文件大小")
}
acceptRanges := resp.Header.Get("Accept-Ranges")
d.Details.AcceptRanges = acceptRanges == "bytes"
disposition := resp.Header.Get("Content-Disposition")
_, params, err := mime.ParseMediaType(disposition)
var ok bool
if d.Details.Filename, ok = params["filename"]; !ok {
contentType := resp.Header.Get("Content-Type")
base := filepath.Base(d.TargetURL.String())
if strings.Contains(base, ".") {
d.Details.Filename = base
} else {
var ext string
exts, _ := mime.ExtensionsByType(contentType)
if len(exts) != 0 {
ext = exts[1]
}
d.Details.Filename = str.Join(base, ext)
}
}
return nil
}
func (d *Task) End() error {
switch d.State {
case READY:
d.Start()
case DOWNLOADING:
break
case SUCCESS:
return nil
case WAITING:
d.group.Wait()
d.Start()
default:
return errors.New("任务失败")
}
d.group.Wait()
if d.State == FAILED {
return errors.New(str.Join("下载失败:", d.Details.Filename))
}
path := filepath.Join(d.Path, d.Details.Filename)
targetFile, err := os.Create(path)
if err != nil {
d.State = FAILED
l.Error(str.Join("无法创建文件:", d.Details.Filename))
return errors.New("无法创建文件")
}
defer func() {
_ = targetFile.Close()
}()
for i := range d.workers {
var part *os.File
part, err = os.OpenFile(
d.workers[i].File,
os.O_RDONLY,
os.ModePerm,
)
if err != nil {
l.Error(str.Join("文件合并失败:", d.Details.Filename))
return err
}
_, err = io.Copy(targetFile, part)
if err != nil {
l.Error(str.Join("文件合并失败:", d.Details.Filename))
return err
}
_ = part.Close()
}
d.State = SUCCESS
defer d.clearTempFiles()
return nil
}
func (d *Task) clearTempFiles() {
for i := range d.workers {
_ = os.Remove(d.workers[i].File)
}
}

20
net/apps/multithread_downloader/range.go

@ -1,20 +0,0 @@
package mdown
type requestRange struct {
Start int
End int
}
func rangeGenerator(total, parts int) (ranges []requestRange) {
each := total / parts
for i := range parts {
ranges = append(
ranges, requestRange{
i * each,
(i+1)*each - 1,
},
)
}
ranges[len(ranges)-1].End = total
return ranges
}

41
net/apps/multithread_downloader/task_options.go

@ -1,41 +0,0 @@
package mdown
import (
"net/http"
"net/url"
)
type ThreadPool interface {
Push(worker ...*Worker)
}
type TaskOptions func(*Task) error
func WithProxy(URL string) TaskOptions {
proxy, err := url.Parse(URL)
if err != nil {
return func(_ *Task) error {
return err
}
}
return func(task *Task) error {
task.client.Transport = &http.Transport{
Proxy: http.ProxyURL(proxy),
}
return nil
}
}
func WithThreadPool(pool ThreadPool) TaskOptions {
return func(task *Task) error {
task.pool = pool
return nil
}
}
func WithSetting(settings *Settings) TaskOptions {
return func(task *Task) error {
task.Settings = settings
return nil
}
}

116
net/apps/multithread_downloader/worker.go

@ -1,116 +0,0 @@
package mdown
import (
"io"
"net/http"
"net/url"
"os"
"strconv"
"sync"
"git.realxlfd.cc/RealXLFD/golib/utils/ioplus"
"git.realxlfd.cc/RealXLFD/golib/utils/str"
"github.com/dustin/go-humanize"
)
type Worker struct {
Task *Task
Failed bool
Range requestRange
Url *url.URL
File string
WG *sync.WaitGroup
Writer *ioplus.SWrite
}
func (w *Worker) Do() {
header := http.Header{
"Range": []string{
str.Join(
"bytes=",
strconv.Itoa(w.Range.Start),
"-",
strconv.Itoa(w.Range.End),
),
},
}
for k := range w.Task.Settings.Headers {
header[k] = w.Task.Settings.Headers[k]
}
req := &http.Request{
Header: header,
URL: w.Url,
}
l.Info(str.Join("发起下载:", w.File))
for range w.Task.Settings.Retry + 1 {
switch w.request(req) {
case Retry:
l.Info(str.Join("重试:", w.File))
continue
case Fatal:
break
case Success:
l.Info(
str.Join(
"下载完成,一共写入:",
humanize.Bytes(uint64(w.Writer.Total())),
),
)
w.WG.Done()
return
}
break
}
l.Error(str.Join("失败:", w.File))
w.Failed = true
w.Task.State = FAILED
w.WG.Done()
return
}
type ReqStatus int
const (
Fatal ReqStatus = iota - 1
Retry
Success
)
func (w *Worker) request(req *http.Request) (status ReqStatus) {
resp, err := w.Task.client.Do(req)
if err != nil {
return Retry
}
defer func() {
_ = resp.Body.Close()
}()
if resp.StatusCode >= 300 {
l.Error(str.Join("响应:", strconv.Itoa(resp.StatusCode)))
return Retry
}
file, err := os.OpenFile(w.File, os.O_CREATE, os.ModePerm)
if err != nil {
l.Error(str.Join("无法打开文件:", w.File))
return Fatal
}
err = file.Truncate(0)
if err != nil {
l.Error(str.Join("无法覆写文件:", w.File))
return Fatal
}
_, err = file.Seek(0, 0)
if err != nil {
l.Error(str.Join("无法覆写文件:", w.File))
return Fatal
}
defer func() {
_ = file.Close()
}()
w.Writer = ioplus.NewSWriteCloser(file)
_, err = io.Copy(w.Writer, resp.Body)
if err != nil {
l.Error(str.Join("无法写入文件:", w.File))
return Fatal
}
return Success
}

21
proc/state/index.go

@ -28,17 +28,25 @@ func New(state State) *Manager {
}
}
func (m *Manager) ToState(state State) {
func (m *Manager) To(state State) {
m.lock.Lock()
defer m.lock.Unlock()
m.State = state
m.cond.Broadcast()
}
func contains(state State, states []State) bool {
for i := range states {
if states[i] == state {
return true
}
}
return false
}
func (m *Manager) WaitState(state State) (ok bool) {
func (m *Manager) Wait(states ...State) (ok bool) {
m.lock.Lock()
defer m.lock.Unlock()
for m.State != state {
for contains(m.State, states) {
m.cond.Wait()
if m.exit {
return false
@ -64,10 +72,5 @@ func (m *Manager) QuitAll() {
}
func (m *Manager) Is(states ...State) bool {
for _, state := range states {
if m.State == state {
return true
}
}
return false
return contains(m.State, states)
}

29
utils/speed/v2/speed.go

@ -0,0 +1,29 @@
package speed
import (
"github.com/dustin/go-humanize"
)
type Speed struct {
current int64
total int64
average int64
}
func (s Speed) Average() int64 {
return s.average
}
func (s Speed) Current() int64 {
return s.current
}
func (s Speed) Total() int64 {
return s.total
}
func (s Speed) String() (cur, avg string) {
cur = humanize.Bytes(uint64(s.current))
avg = humanize.Bytes(uint64(s.average))
return
}

77
utils/speed/v2/writer.go

@ -0,0 +1,77 @@
package speed
import (
"sync"
"time"
"git.realxlfd.cc/RealXLFD/golib/utils/containers/queue"
)
type WriteCloser struct {
Speed Speed
queue *queue.Queue[int64]
lock *sync.Mutex
start int64
lastCalc int64
last int64
refresh time.Duration
running bool
promise *sync.WaitGroup
}
func (w *WriteCloser) Running() bool {
return w.running
}
func (w *WriteCloser) Write(p []byte) (n int, err error) {
w.lock.Lock()
defer w.lock.Unlock()
n = len(p)
if n > 0 {
w.queue.Push(int64(n))
}
return n, nil
}
func (w *WriteCloser) Close() {
w.running = false
w.promise.Wait()
calc(w)
}
func (w *WriteCloser) Recover() *WriteCloser {
if w.running {
return w
}
w.promise.Wait()
w.Run()
return w
}
func (w *WriteCloser) Pause() *WriteCloser {
if !w.running {
return w
}
w.running = false
return w
}
func (w *WriteCloser) Run() *WriteCloser {
if w.running {
return w
}
w.running = true
w.promise.Add(1)
go daemon(w)
return w
}
func NewWriter(interval time.Duration) *WriteCloser {
return &WriteCloser{
queue: queue.New[int64](),
lock: &sync.Mutex{},
refresh: interval,
running: false,
promise: &sync.WaitGroup{},
}
}

36
utils/speed/v2/writer_internal.go

@ -0,0 +1,36 @@
package speed
import (
"time"
)
func daemon(w *WriteCloser) {
w.start = time.Now().UnixMilli()
w.lastCalc = w.start
time.Sleep(time.Millisecond * 50)
for w.running {
calc(w)
time.Sleep(w.refresh)
}
w.last += time.Now().UnixMilli() - w.start
w.promise.Done()
}
func calc(w *WriteCloser) {
w.lock.Lock()
defer w.lock.Unlock()
var written int64
for w.queue.Size() > 0 {
written += w.queue.Pop()
}
now := time.Now().UnixMilli()
interval := now - w.lastCalc
w.lastCalc = now
if written > 0 {
w.Speed.current = written * 1000 / interval
w.Speed.total += written
}
if w.Speed.total > 0 {
w.Speed.average = w.Speed.total * 1000 / (now - w.start + w.last)
}
}
Loading…
Cancel
Save