diff --git a/utils/file/dcom/exec.go b/cliapps/dcom/exec.go similarity index 100% rename from utils/file/dcom/exec.go rename to cliapps/dcom/exec.go diff --git a/utils/file/dcom/init.go b/cliapps/dcom/init.go similarity index 100% rename from utils/file/dcom/init.go rename to cliapps/dcom/init.go diff --git a/cliapps/ffmpeg/options.go b/cliapps/ffmpeg/options.go index 587035e..82979cc 100644 --- a/cliapps/ffmpeg/options.go +++ b/cliapps/ffmpeg/options.go @@ -126,7 +126,7 @@ func (a AMFQuality) Option() []string { } const ( - AMFQualitySpeed = `speed` + AMFQualitySpeed = `v2` AMFQualityBalanced = `balanced` AMFQualityQuality = `quality` ) diff --git a/net/apps/mdown/basic.go b/net/apps/mdown/basic.go deleted file mode 100644 index 2ec1a37..0000000 --- a/net/apps/mdown/basic.go +++ /dev/null @@ -1,26 +0,0 @@ -package mdown - -import ( - "time" - - "git.realxlfd.cc/RealXLFD/golib/cli/logger" -) - -var ( - log = logger.New() -) - -// WAITING -> HEAD -> READY -const ( - FAILED = iota - 1 - WAITING - HEAD - READY - RUNNING - SUCCESS -) - -var ( - HeadRetryCount = 2 - HeadRetryGap = 300 * time.Millisecond -) diff --git a/net/apps/mdown/errors.go b/net/apps/mdown/errors.go deleted file mode 100644 index 257eb9f..0000000 --- a/net/apps/mdown/errors.go +++ /dev/null @@ -1,45 +0,0 @@ -package mdown - -import ( - "errors" - "io" - "net/http" - "strconv" - - "git.realxlfd.cc/RealXLFD/golib/utils/str" -) - -type ErrRequestFailed struct { - StatusCode int - Msg string -} - -func (e ErrRequestFailed) Error() string { - return str.Join( - "status: ", strconv.Itoa(e.StatusCode), " msg: ", e.Msg, - ) -} - -func requestFailed(resp *http.Response) error { - body, _ := io.ReadAll(resp.Body) - return ErrRequestFailed{ - StatusCode: resp.StatusCode, - Msg: string(body), - } -} - -type ErrNetworkErr error - -type ErrFileOpenFailed error - -func errorDetail(err error) string { - var errNetworkErr ErrNetworkErr - var errFileOpenFailed ErrFileOpenFailed - switch { - case errors.As(err, &errNetworkErr): - return str.Join("网络错误:", err.Error()) - case errors.As(err, &errFileOpenFailed): - return str.Join("写入失败:", err.Error()) - } - return err.Error() -} diff --git a/net/apps/mdown/range.go b/net/apps/mdown/range.go deleted file mode 100644 index b6d4214..0000000 --- a/net/apps/mdown/range.go +++ /dev/null @@ -1,32 +0,0 @@ -package mdown - -import ( - "net/http" - "strconv" - - "git.realxlfd.cc/RealXLFD/golib/utils/str" -) - -func rangeGenerator(total, parts int) (ranges []http.Header) { - each := total / parts - for i := range parts - 1 { - ranges = append( - ranges, rangeHeader(i*each, (i+1)*each-1), - ) - } - ranges = append(ranges, rangeHeader((parts-1)*each, total)) - return ranges -} - -func rangeHeader(start, end int) http.Header { - return http.Header{ - "Range": []string{ - str.Join( - "bytes=", - strconv.Itoa(start), - "-", - strconv.Itoa(end), - ), - }, - } -} diff --git a/net/apps/mdown/target.go b/net/apps/mdown/target.go deleted file mode 100644 index 6f090a8..0000000 --- a/net/apps/mdown/target.go +++ /dev/null @@ -1,218 +0,0 @@ -package mdown - -import ( - "errors" - "io" - "mime" - "net/http" - "net/url" - "os" - "path/filepath" - "strconv" - "strings" - - "git.realxlfd.cc/RealXLFD/golib/proc/state" - "git.realxlfd.cc/RealXLFD/golib/utils" - "git.realxlfd.cc/RealXLFD/golib/utils/ioplus" - "git.realxlfd.cc/RealXLFD/golib/utils/str" -) - -type Target struct { - URL *url.URL - cookies http.Header - Path string - proxy *url.URL - state *state.Manager - client *http.Client - details struct { - Filename string - ContentSize int - AcceptRanges bool - } -} - -func NewTarget(URL, Path string) *Target { - target, err := url.Parse(URL) - if err != nil { - panic("错误的URL") - } - stat, err := os.Stat(Path) - switch { - case err != nil: - err = os.MkdirAll(Path, os.ModePerm) - if err != nil { - panic("创建文件夹失败") - } - case !stat.IsDir(): - panic("路径应是文件夹而不是文件") - } - return &Target{ - URL: target, - Path: Path, - state: state.New(WAITING), - } -} - -func (t *Target) JustDownload() error { - switch t.state.Get() { - case FAILED: - return errors.New("下载失败") - case SUCCESS: - return nil - default: - } - t.getClient() // TODO - file := filepath.Join(t.Path, t.details.Filename) - err := t.request(nil, file, nil) - if err != nil { - - log.Error(err.Error()) - } - -} - -// 注意!内部将修改Range -func (t *Target) request( - rangeHeader http.Header, file string, s *ioplus.SWrite, -) error { - req := &http.Request{ - URL: t.URL, - Header: t.cookies, - } - if rangeHeader == nil { - utils.JoinMap(rangeHeader, t.cookies) - req.Header = rangeHeader - } - resp, err := t.client.Do(req) - if err != nil { - return ErrNetworkErr(err) - } - defer func(Body io.ReadCloser) { - _ = Body.Close() - }(resp.Body) - if resp.StatusCode >= 300 || resp.StatusCode < 200 { - t.state.ToState(FAILED) - return requestFailed(resp) - } - // write to file - entry, err := os.OpenFile( - file, os.O_CREATE|os.O_WRONLY|os.O_APPEND, os.ModePerm, - ) // TODO: 处理断点续传 - if err != nil { - return ErrFileOpenFailed(err) - } - defer func(entry *os.File) { - _ = entry.Close() - }(entry) - writer := s.Bind(entry) // 将写入器绑定以便测量速度 - _, err = io.Copy(writer, resp.Body) - if err != nil { - return ErrFileOpenFailed(err) - } - return nil -} - -func (t *Target) getClient() { - t.client = &http.Client{} - if t.proxy != nil { - t.client.Transport = &http.Transport{ - Proxy: http.ProxyURL(t.proxy), - } - } -} - -func (t *Target) Proxy(proxy string) *Target { - proxyURL, err := url.Parse(proxy) - if err != nil { - panic("URL格式不正确:") - } - t.proxy = proxyURL - return t -} - -func (t *Target) Cookies(cookies string) *Target { - t.cookies = http.Header{ - "Cookie": []string{cookies}, - } - return t -} - -func (t *Target) Get() *Target { - if t.state.Is(HEAD, READY) { - return t - } - t.state.ToState(HEAD) - go func() { - err := t.head() - if err != nil { - log.Error(err.Error()) - t.state.ToState(FAILED) - t.state.QuitAll() - return - } - t.state.ToState(READY) - }() - return t -} - -func (t *Target) head() error { - query := &http.Request{ - Method: "HEAD", - URL: t.URL, - Header: t.cookies, - } - resp, err := t.client.Do(query) - if err != nil { - return ErrNetworkErr(err) - } - defer func(Body io.ReadCloser) { - _ = Body.Close() - }(resp.Body) - if resp.StatusCode >= 300 || resp.StatusCode < 200 { - return requestFailed(resp) - } - var ok bool - for i := range HeadRetryCount + 1 { - ok = t.fromHeadGetDetails(resp) - if ok { - goto SUCCESS - } - log.Error("获取文件信息失败,重试次数:", strconv.Itoa(i), t.URL.String()) - } - return ErrRequestFailed{ - StatusCode: resp.StatusCode, - Msg: str.Join("无法获取文件信息:", t.URL.String()), - } -SUCCESS: - return nil -} -func (t *Target) fromHeadGetDetails(resp *http.Response) (ok bool) { - contentLength := resp.Header.Get("Content-Length") - var err error - if t.details.ContentSize, err = strconv.Atoi(contentLength); contentLength == "" || err != nil { - return false - } - acceptRanges := resp.Header.Get("Accept-Ranges") - t.details.AcceptRanges = acceptRanges == "bytes" - disposition := resp.Header.Get("Content-Disposition") - _, params, err := mime.ParseMediaType(disposition) - if t.details.Filename, ok = params["filename"]; !ok { - contentType := resp.Header.Get("Content-Type") - base := filepath.Base(t.URL.String()) - if strings.Contains(base, ".") { - t.details.Filename = base - } else { - var ext string - exts, _ := mime.ExtensionsByType(contentType) - if len(exts) != 0 { - ext = exts[1] - } - t.details.Filename = str.Join(base, ext) - } - } - return true -} - -func (t *Target) Wait() { - -} diff --git a/net/apps/mdown/thread.go b/net/apps/mdown/thread.go deleted file mode 100644 index 4ca21e8..0000000 --- a/net/apps/mdown/thread.go +++ /dev/null @@ -1,5 +0,0 @@ -package mdown - -type Thread struct { - Target *T -} diff --git a/net/apps/multithread_downloader/client_option.go b/net/apps/multithread_downloader/client_option.go deleted file mode 100644 index af9ec55..0000000 --- a/net/apps/multithread_downloader/client_option.go +++ /dev/null @@ -1,34 +0,0 @@ -package mdown - -import ( - "net/http" - "net/url" -) - -type Option func(*Client) - -func WithTransportProxy(proxyURL *url.URL) Option { - return func(client *Client) { - client.Transport = &http.Transport{ - Proxy: http.ProxyURL(proxyURL), - } - } -} - -func WithMaxThread(count int) Option { - if count <= 0 { - panic("count must be greater than zero") - } - return func(client *Client) { - client.MaxThread = count - } -} - -func WithMaxThreadPerReq(count int) Option { - if count <= 0 { - panic("count must be greater than zero") - } - return func(client *Client) { - client.MaxThreadPerURL = count - } -} diff --git a/net/apps/multithread_downloader/control.go b/net/apps/multithread_downloader/control.go deleted file mode 100644 index f1b15e5..0000000 --- a/net/apps/multithread_downloader/control.go +++ /dev/null @@ -1,50 +0,0 @@ -package mdown - -import ( - "net/http" - "time" - - "git.realxlfd.cc/RealXLFD/golib/proc/thread/pool" -) - -var ( - TempFilesLocation = "./temp" -) - -const ( - DefaultMaxThread = 8 // 默认单文件最大线程数 - RetryTimeGap = 500 * time.Millisecond // 重试间隔 -) - -type Client struct { - Transport *http.Transport - tasks []*Task - pool *pool.Pool[*Worker, struct{}] - MaxThread int - MaxThreadPerURL int -} - -func NewClient(opts ...Option) *Client { - client := &Client{ - nil, - make([]*Task, 0), - nil, - DefaultMaxThread, - DefaultMaxThread, - } - for _, opt := range opts { - opt(client) - } - return client -} - -// Download 异步提交一个下载任务 -// func (c *Client) Download( -// URL string, file string, header http.Header, -// ) (*Task, error) { -// targetURL, err := url.Parse(URL) -// if err != nil { -// return nil, err -// } -// return nil, err -// } diff --git a/net/apps/multithread_downloader/downloader.go b/net/apps/multithread_downloader/downloader.go deleted file mode 100644 index d1873fe..0000000 --- a/net/apps/multithread_downloader/downloader.go +++ /dev/null @@ -1,268 +0,0 @@ -package mdown - -import ( - "errors" - "io" - "mime" - "net/http" - "net/url" - "os" - "path/filepath" - "strconv" - "strings" - "sync" - "time" - - "git.realxlfd.cc/RealXLFD/golib/cli/logger" - "git.realxlfd.cc/RealXLFD/golib/utils/str" -) - -func init() { - _ = mime.AddExtensionType(".zip", "application/zip") - _ = os.MkdirAll(TempFilesLocation, os.ModePerm) -} - -type Task struct { - client *http.Client - TargetURL *url.URL - Path string - Settings *Settings - Details Details - State State - group *sync.WaitGroup - workers []*Worker - pool ThreadPool -} - -type Settings struct { - TempDir string - Retry int - Headers http.Header - Condition func(*Details) int // 控制线程数量 -} - -type Details struct { - Filename string - ContentSize int - AcceptRanges bool -} - -type State int - -const ( - FAILED State = iota - 1 - WAITING - READY - DOWNLOADING - SUCCESS -) - -var ( - l = logger.Logger{} -) - -func NewTask(URL string, path string, opts ...TaskOptions) (*Task, error) { - target, err := url.Parse(URL) - if err != nil { - return nil, err - } - stat, err := os.Stat(path) - if err != nil || !stat.IsDir() { - return nil, errors.New(str.Join("invalid path: ", path)) - } - task := &Task{ - client: &http.Client{}, - TargetURL: target, - Path: path, - Settings: &Settings{}, - Details: Details{}, - State: WAITING, - group: &sync.WaitGroup{}, - workers: make([]*Worker, 0), - pool: nil, - } - for i := range opts { - err = opts[i](task) - if err != nil { - return nil, err - } - } - task.group.Add(1) - go func() { - var err error - for range task.Settings.Retry + 1 { - err = task.head() - if err != nil { - l.Error(err.Error()) - time.Sleep(RetryTimeGap) - continue - } - break - } - if err != nil { - task.State = FAILED - } else { - task.State = READY - } - task.group.Done() - }() - return task, nil -} - -func (d *Task) Start() *Task { - switch d.State { - case READY: - break - case DOWNLOADING: - fallthrough - case SUCCESS: - return d - case WAITING: - d.group.Wait() - default: - l.Error("任务失败,Start()调用无效") - return d - } - threads := DefaultMaxThread - if d.Settings.Condition != nil { - threads = d.Settings.Condition(&d.Details) - } - d.spawn(threads) - return d -} - -func (d *Task) spawn(threads int) { - var workers []*Worker - var ranges []requestRange - if d.Details.AcceptRanges || d.pool != nil { - ranges = rangeGenerator(d.Details.ContentSize, threads) - } else { - threads = 1 - ranges = []requestRange{ - { - 0, d.Details.ContentSize, - }, - } - } - for i := range threads { - w := &Worker{ - d, - false, - ranges[i], - d.TargetURL, - filepath.Join( - d.Settings.TempDir, - str.Join(d.Details.Filename, ".part", strconv.Itoa(i)), - ), - d.group, - nil, - } - workers = append(workers, w) - } - d.group.Add(len(workers)) - if d.pool != nil { - d.pool.Push(workers...) - } else { - workers[0].Do() - } - d.workers = workers - d.State = DOWNLOADING - return -} - -func (d *Task) head() error { - headQuery := &http.Request{ - Method: "HEAD", - URL: d.TargetURL, - Header: d.Settings.Headers, - } - resp, err := d.client.Do(headQuery) - if err != nil { - return err - } - defer func() { - _ = resp.Body.Close() - }() - contentLength := resp.Header.Get("Content-Length") - if d.Details.ContentSize, err = strconv.Atoi(contentLength); contentLength == "" || err != nil { - return errors.New("无法获取文件大小") - } - acceptRanges := resp.Header.Get("Accept-Ranges") - d.Details.AcceptRanges = acceptRanges == "bytes" - disposition := resp.Header.Get("Content-Disposition") - _, params, err := mime.ParseMediaType(disposition) - var ok bool - if d.Details.Filename, ok = params["filename"]; !ok { - contentType := resp.Header.Get("Content-Type") - base := filepath.Base(d.TargetURL.String()) - if strings.Contains(base, ".") { - d.Details.Filename = base - } else { - var ext string - exts, _ := mime.ExtensionsByType(contentType) - if len(exts) != 0 { - ext = exts[1] - } - d.Details.Filename = str.Join(base, ext) - } - } - return nil -} - -func (d *Task) End() error { - switch d.State { - case READY: - d.Start() - case DOWNLOADING: - break - case SUCCESS: - return nil - case WAITING: - d.group.Wait() - d.Start() - default: - return errors.New("任务失败") - } - d.group.Wait() - if d.State == FAILED { - return errors.New(str.Join("下载失败:", d.Details.Filename)) - } - path := filepath.Join(d.Path, d.Details.Filename) - - targetFile, err := os.Create(path) - if err != nil { - d.State = FAILED - l.Error(str.Join("无法创建文件:", d.Details.Filename)) - return errors.New("无法创建文件") - } - defer func() { - _ = targetFile.Close() - }() - for i := range d.workers { - var part *os.File - part, err = os.OpenFile( - d.workers[i].File, - os.O_RDONLY, - os.ModePerm, - ) - if err != nil { - l.Error(str.Join("文件合并失败:", d.Details.Filename)) - return err - } - _, err = io.Copy(targetFile, part) - if err != nil { - l.Error(str.Join("文件合并失败:", d.Details.Filename)) - return err - } - _ = part.Close() - } - d.State = SUCCESS - defer d.clearTempFiles() - return nil -} - -func (d *Task) clearTempFiles() { - for i := range d.workers { - _ = os.Remove(d.workers[i].File) - } -} diff --git a/net/apps/multithread_downloader/range.go b/net/apps/multithread_downloader/range.go deleted file mode 100644 index 412b166..0000000 --- a/net/apps/multithread_downloader/range.go +++ /dev/null @@ -1,20 +0,0 @@ -package mdown - -type requestRange struct { - Start int - End int -} - -func rangeGenerator(total, parts int) (ranges []requestRange) { - each := total / parts - for i := range parts { - ranges = append( - ranges, requestRange{ - i * each, - (i+1)*each - 1, - }, - ) - } - ranges[len(ranges)-1].End = total - return ranges -} diff --git a/net/apps/multithread_downloader/task_options.go b/net/apps/multithread_downloader/task_options.go deleted file mode 100644 index 45627d2..0000000 --- a/net/apps/multithread_downloader/task_options.go +++ /dev/null @@ -1,41 +0,0 @@ -package mdown - -import ( - "net/http" - "net/url" -) - -type ThreadPool interface { - Push(worker ...*Worker) -} - -type TaskOptions func(*Task) error - -func WithProxy(URL string) TaskOptions { - proxy, err := url.Parse(URL) - if err != nil { - return func(_ *Task) error { - return err - } - } - return func(task *Task) error { - task.client.Transport = &http.Transport{ - Proxy: http.ProxyURL(proxy), - } - return nil - } -} - -func WithThreadPool(pool ThreadPool) TaskOptions { - return func(task *Task) error { - task.pool = pool - return nil - } -} - -func WithSetting(settings *Settings) TaskOptions { - return func(task *Task) error { - task.Settings = settings - return nil - } -} diff --git a/net/apps/multithread_downloader/worker.go b/net/apps/multithread_downloader/worker.go deleted file mode 100644 index 0433e52..0000000 --- a/net/apps/multithread_downloader/worker.go +++ /dev/null @@ -1,116 +0,0 @@ -package mdown - -import ( - "io" - "net/http" - "net/url" - "os" - "strconv" - "sync" - - "git.realxlfd.cc/RealXLFD/golib/utils/ioplus" - "git.realxlfd.cc/RealXLFD/golib/utils/str" - "github.com/dustin/go-humanize" -) - -type Worker struct { - Task *Task - Failed bool - Range requestRange - Url *url.URL - File string - WG *sync.WaitGroup - Writer *ioplus.SWrite -} - -func (w *Worker) Do() { - header := http.Header{ - "Range": []string{ - str.Join( - "bytes=", - strconv.Itoa(w.Range.Start), - "-", - strconv.Itoa(w.Range.End), - ), - }, - } - for k := range w.Task.Settings.Headers { - header[k] = w.Task.Settings.Headers[k] - } - req := &http.Request{ - Header: header, - URL: w.Url, - } - l.Info(str.Join("发起下载:", w.File)) - for range w.Task.Settings.Retry + 1 { - switch w.request(req) { - case Retry: - l.Info(str.Join("重试:", w.File)) - continue - case Fatal: - break - case Success: - l.Info( - str.Join( - "下载完成,一共写入:", - humanize.Bytes(uint64(w.Writer.Total())), - ), - ) - w.WG.Done() - return - } - break - } - l.Error(str.Join("失败:", w.File)) - w.Failed = true - w.Task.State = FAILED - w.WG.Done() - return -} - -type ReqStatus int - -const ( - Fatal ReqStatus = iota - 1 - Retry - Success -) - -func (w *Worker) request(req *http.Request) (status ReqStatus) { - resp, err := w.Task.client.Do(req) - if err != nil { - return Retry - } - defer func() { - _ = resp.Body.Close() - }() - if resp.StatusCode >= 300 { - l.Error(str.Join("响应:", strconv.Itoa(resp.StatusCode))) - return Retry - } - file, err := os.OpenFile(w.File, os.O_CREATE, os.ModePerm) - if err != nil { - l.Error(str.Join("无法打开文件:", w.File)) - return Fatal - } - err = file.Truncate(0) - if err != nil { - l.Error(str.Join("无法覆写文件:", w.File)) - return Fatal - } - _, err = file.Seek(0, 0) - if err != nil { - l.Error(str.Join("无法覆写文件:", w.File)) - return Fatal - } - defer func() { - _ = file.Close() - }() - w.Writer = ioplus.NewSWriteCloser(file) - _, err = io.Copy(w.Writer, resp.Body) - if err != nil { - l.Error(str.Join("无法写入文件:", w.File)) - return Fatal - } - return Success -} diff --git a/proc/state/index.go b/proc/state/index.go index 72d5b93..a46cf21 100644 --- a/proc/state/index.go +++ b/proc/state/index.go @@ -28,17 +28,25 @@ func New(state State) *Manager { } } -func (m *Manager) ToState(state State) { +func (m *Manager) To(state State) { m.lock.Lock() defer m.lock.Unlock() m.State = state m.cond.Broadcast() } +func contains(state State, states []State) bool { + for i := range states { + if states[i] == state { + return true + } + } + return false +} -func (m *Manager) WaitState(state State) (ok bool) { +func (m *Manager) Wait(states ...State) (ok bool) { m.lock.Lock() defer m.lock.Unlock() - for m.State != state { + for contains(m.State, states) { m.cond.Wait() if m.exit { return false @@ -64,10 +72,5 @@ func (m *Manager) QuitAll() { } func (m *Manager) Is(states ...State) bool { - for _, state := range states { - if m.State == state { - return true - } - } - return false + return contains(m.State, states) } diff --git a/utils/speed/v2/speed.go b/utils/speed/v2/speed.go new file mode 100644 index 0000000..37545a6 --- /dev/null +++ b/utils/speed/v2/speed.go @@ -0,0 +1,29 @@ +package speed + +import ( + "github.com/dustin/go-humanize" +) + +type Speed struct { + current int64 + total int64 + average int64 +} + +func (s Speed) Average() int64 { + return s.average +} + +func (s Speed) Current() int64 { + return s.current +} + +func (s Speed) Total() int64 { + return s.total +} + +func (s Speed) String() (cur, avg string) { + cur = humanize.Bytes(uint64(s.current)) + avg = humanize.Bytes(uint64(s.average)) + return +} diff --git a/utils/speed/v2/writer.go b/utils/speed/v2/writer.go new file mode 100644 index 0000000..daefa13 --- /dev/null +++ b/utils/speed/v2/writer.go @@ -0,0 +1,77 @@ +package speed + +import ( + "sync" + "time" + + "git.realxlfd.cc/RealXLFD/golib/utils/containers/queue" +) + +type WriteCloser struct { + Speed Speed + queue *queue.Queue[int64] + lock *sync.Mutex + start int64 + lastCalc int64 + last int64 + refresh time.Duration + running bool + promise *sync.WaitGroup +} + +func (w *WriteCloser) Running() bool { + return w.running +} + +func (w *WriteCloser) Write(p []byte) (n int, err error) { + w.lock.Lock() + defer w.lock.Unlock() + n = len(p) + if n > 0 { + w.queue.Push(int64(n)) + } + return n, nil +} + +func (w *WriteCloser) Close() { + w.running = false + w.promise.Wait() + calc(w) +} + +func (w *WriteCloser) Recover() *WriteCloser { + if w.running { + return w + } + w.promise.Wait() + w.Run() + return w +} + +func (w *WriteCloser) Pause() *WriteCloser { + if !w.running { + return w + } + w.running = false + return w +} + +func (w *WriteCloser) Run() *WriteCloser { + if w.running { + return w + } + w.running = true + w.promise.Add(1) + go daemon(w) + return w +} + +func NewWriter(interval time.Duration) *WriteCloser { + return &WriteCloser{ + queue: queue.New[int64](), + lock: &sync.Mutex{}, + refresh: interval, + running: false, + promise: &sync.WaitGroup{}, + } +} diff --git a/utils/speed/v2/writer_internal.go b/utils/speed/v2/writer_internal.go new file mode 100644 index 0000000..b68c7b6 --- /dev/null +++ b/utils/speed/v2/writer_internal.go @@ -0,0 +1,36 @@ +package speed + +import ( + "time" +) + +func daemon(w *WriteCloser) { + w.start = time.Now().UnixMilli() + w.lastCalc = w.start + time.Sleep(time.Millisecond * 50) + for w.running { + calc(w) + time.Sleep(w.refresh) + } + w.last += time.Now().UnixMilli() - w.start + w.promise.Done() +} + +func calc(w *WriteCloser) { + w.lock.Lock() + defer w.lock.Unlock() + var written int64 + for w.queue.Size() > 0 { + written += w.queue.Pop() + } + now := time.Now().UnixMilli() + interval := now - w.lastCalc + w.lastCalc = now + if written > 0 { + w.Speed.current = written * 1000 / interval + w.Speed.total += written + } + if w.Speed.total > 0 { + w.Speed.average = w.Speed.total * 1000 / (now - w.start + w.last) + } +}