RealXLFD 4 weeks ago
parent
commit
006b5e4aef
  1. 12
      cli/logger/index.go
  2. 26
      net/apps/mdown/basic.go
  3. 45
      net/apps/mdown/errors.go
  4. 32
      net/apps/mdown/range.go
  5. 218
      net/apps/mdown/target.go
  6. 5
      net/apps/mdown/thread.go
  7. 2
      net/apps/multithread_downloader/worker.go
  8. 73
      proc/state/index.go
  9. 8
      utils/index.go
  10. 22
      utils/ioplus/reader.go
  11. 30
      utils/ioplus/writer.go

12
cli/logger/index.go

@ -55,7 +55,8 @@ func New(opts ...Option) *Logger {
return logger
}
func (l *Logger) Info(msg string) {
func (l *Logger) Info(msgs ...string) {
msg := str.Join(msgs...)
if l.Level < LevelInfo {
return
}
@ -69,7 +70,8 @@ func (l *Logger) Info(msg string) {
OutputFunc(fmt.Sprint(str.Join(timeStamp, argInfo, " "), msg))
}
func (l *Logger) Debug(msg string) {
func (l *Logger) Debug(msgs ...string) {
msg := str.Join(msgs...)
if l.Level < LevelDebug {
return
}
@ -83,7 +85,8 @@ func (l *Logger) Debug(msg string) {
OutputFunc(fmt.Sprint(str.Join(timeStamp, argDebug, " "), msg))
}
func (l *Logger) Warn(msg string) {
func (l *Logger) Warn(msgs ...string) {
msg := str.Join(msgs...)
if l.Level < LevelWarn {
return
}
@ -97,7 +100,8 @@ func (l *Logger) Warn(msg string) {
OutputFunc(fmt.Sprint(str.Join(timeStamp, argWarn, " "), msg))
}
func (l *Logger) Error(msg string) {
func (l *Logger) Error(msgs ...string) {
msg := str.Join(msgs...)
if l.Level < LevelError {
return
}

26
net/apps/mdown/basic.go

@ -0,0 +1,26 @@
package mdown
import (
"time"
"git.realxlfd.cc/RealXLFD/golib/cli/logger"
)
var (
log = logger.New()
)
// WAITING -> HEAD -> READY
const (
FAILED = iota - 1
WAITING
HEAD
READY
RUNNING
SUCCESS
)
var (
HeadRetryCount = 2
HeadRetryGap = 300 * time.Millisecond
)

45
net/apps/mdown/errors.go

@ -0,0 +1,45 @@
package mdown
import (
"errors"
"io"
"net/http"
"strconv"
"git.realxlfd.cc/RealXLFD/golib/utils/str"
)
type ErrRequestFailed struct {
StatusCode int
Msg string
}
func (e ErrRequestFailed) Error() string {
return str.Join(
"status: ", strconv.Itoa(e.StatusCode), " msg: ", e.Msg,
)
}
func requestFailed(resp *http.Response) error {
body, _ := io.ReadAll(resp.Body)
return ErrRequestFailed{
StatusCode: resp.StatusCode,
Msg: string(body),
}
}
type ErrNetworkErr error
type ErrFileOpenFailed error
func errorDetail(err error) string {
var errNetworkErr ErrNetworkErr
var errFileOpenFailed ErrFileOpenFailed
switch {
case errors.As(err, &errNetworkErr):
return str.Join("网络错误:", err.Error())
case errors.As(err, &errFileOpenFailed):
return str.Join("写入失败:", err.Error())
}
return err.Error()
}

32
net/apps/mdown/range.go

@ -0,0 +1,32 @@
package mdown
import (
"net/http"
"strconv"
"git.realxlfd.cc/RealXLFD/golib/utils/str"
)
func rangeGenerator(total, parts int) (ranges []http.Header) {
each := total / parts
for i := range parts - 1 {
ranges = append(
ranges, rangeHeader(i*each, (i+1)*each-1),
)
}
ranges = append(ranges, rangeHeader((parts-1)*each, total))
return ranges
}
func rangeHeader(start, end int) http.Header {
return http.Header{
"Range": []string{
str.Join(
"bytes=",
strconv.Itoa(start),
"-",
strconv.Itoa(end),
),
},
}
}

218
net/apps/mdown/target.go

@ -0,0 +1,218 @@
package mdown
import (
"errors"
"io"
"mime"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"git.realxlfd.cc/RealXLFD/golib/proc/state"
"git.realxlfd.cc/RealXLFD/golib/utils"
"git.realxlfd.cc/RealXLFD/golib/utils/ioplus"
"git.realxlfd.cc/RealXLFD/golib/utils/str"
)
type Target struct {
URL *url.URL
cookies http.Header
Path string
proxy *url.URL
state *state.Manager
client *http.Client
details struct {
Filename string
ContentSize int
AcceptRanges bool
}
}
func NewTarget(URL, Path string) *Target {
target, err := url.Parse(URL)
if err != nil {
panic("错误的URL")
}
stat, err := os.Stat(Path)
switch {
case err != nil:
err = os.MkdirAll(Path, os.ModePerm)
if err != nil {
panic("创建文件夹失败")
}
case !stat.IsDir():
panic("路径应是文件夹而不是文件")
}
return &Target{
URL: target,
Path: Path,
state: state.New(WAITING),
}
}
func (t *Target) JustDownload() error {
switch t.state.Get() {
case FAILED:
return errors.New("下载失败")
case SUCCESS:
return nil
default:
}
t.getClient() // TODO
file := filepath.Join(t.Path, t.details.Filename)
err := t.request(nil, file, nil)
if err != nil {
log.Error(err.Error())
}
}
// 注意!内部将修改Range
func (t *Target) request(
rangeHeader http.Header, file string, s *ioplus.SWrite,
) error {
req := &http.Request{
URL: t.URL,
Header: t.cookies,
}
if rangeHeader == nil {
utils.JoinMap(rangeHeader, t.cookies)
req.Header = rangeHeader
}
resp, err := t.client.Do(req)
if err != nil {
return ErrNetworkErr(err)
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
t.state.ToState(FAILED)
return requestFailed(resp)
}
// write to file
entry, err := os.OpenFile(
file, os.O_CREATE|os.O_WRONLY|os.O_APPEND, os.ModePerm,
) // TODO: 处理断点续传
if err != nil {
return ErrFileOpenFailed(err)
}
defer func(entry *os.File) {
_ = entry.Close()
}(entry)
writer := s.Bind(entry) // 将写入器绑定以便测量速度
_, err = io.Copy(writer, resp.Body)
if err != nil {
return ErrFileOpenFailed(err)
}
return nil
}
func (t *Target) getClient() {
t.client = &http.Client{}
if t.proxy != nil {
t.client.Transport = &http.Transport{
Proxy: http.ProxyURL(t.proxy),
}
}
}
func (t *Target) Proxy(proxy string) *Target {
proxyURL, err := url.Parse(proxy)
if err != nil {
panic("URL格式不正确:")
}
t.proxy = proxyURL
return t
}
func (t *Target) Cookies(cookies string) *Target {
t.cookies = http.Header{
"Cookie": []string{cookies},
}
return t
}
func (t *Target) Get() *Target {
if t.state.Is(HEAD, READY) {
return t
}
t.state.ToState(HEAD)
go func() {
err := t.head()
if err != nil {
log.Error(err.Error())
t.state.ToState(FAILED)
t.state.QuitAll()
return
}
t.state.ToState(READY)
}()
return t
}
func (t *Target) head() error {
query := &http.Request{
Method: "HEAD",
URL: t.URL,
Header: t.cookies,
}
resp, err := t.client.Do(query)
if err != nil {
return ErrNetworkErr(err)
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
return requestFailed(resp)
}
var ok bool
for i := range HeadRetryCount + 1 {
ok = t.fromHeadGetDetails(resp)
if ok {
goto SUCCESS
}
log.Error("获取文件信息失败,重试次数:", strconv.Itoa(i), t.URL.String())
}
return ErrRequestFailed{
StatusCode: resp.StatusCode,
Msg: str.Join("无法获取文件信息:", t.URL.String()),
}
SUCCESS:
return nil
}
func (t *Target) fromHeadGetDetails(resp *http.Response) (ok bool) {
contentLength := resp.Header.Get("Content-Length")
var err error
if t.details.ContentSize, err = strconv.Atoi(contentLength); contentLength == "" || err != nil {
return false
}
acceptRanges := resp.Header.Get("Accept-Ranges")
t.details.AcceptRanges = acceptRanges == "bytes"
disposition := resp.Header.Get("Content-Disposition")
_, params, err := mime.ParseMediaType(disposition)
if t.details.Filename, ok = params["filename"]; !ok {
contentType := resp.Header.Get("Content-Type")
base := filepath.Base(t.URL.String())
if strings.Contains(base, ".") {
t.details.Filename = base
} else {
var ext string
exts, _ := mime.ExtensionsByType(contentType)
if len(exts) != 0 {
ext = exts[1]
}
t.details.Filename = str.Join(base, ext)
}
}
return true
}
func (t *Target) Wait() {
}

5
net/apps/mdown/thread.go

@ -0,0 +1,5 @@
package mdown
type Thread struct {
Target *T
}

2
net/apps/multithread_downloader/worker.go

@ -19,7 +19,7 @@ type Worker struct {
Url *url.URL
File string
WG *sync.WaitGroup
Writer *ioplus.SWriteCloser
Writer *ioplus.SWrite
}
func (w *Worker) Do() {

73
proc/state/index.go

@ -0,0 +1,73 @@
package state
import "sync"
type Manager struct {
State State
lock *sync.Mutex
cond *sync.Cond
exit bool
}
type State int
const (
FAILED State = iota - 1
WAITING
READY
RUNNING
SUCCESS
)
func New(state State) *Manager {
lock := &sync.Mutex{}
return &Manager{
State: state,
lock: lock,
cond: sync.NewCond(lock),
}
}
func (m *Manager) ToState(state State) {
m.lock.Lock()
defer m.lock.Unlock()
m.State = state
m.cond.Broadcast()
}
func (m *Manager) WaitState(state State) (ok bool) {
m.lock.Lock()
defer m.lock.Unlock()
for m.State != state {
m.cond.Wait()
if m.exit {
return false
}
}
return true
}
func (m *Manager) WaitChange() {
m.lock.Lock()
defer m.lock.Unlock()
m.cond.Wait()
return
}
func (m *Manager) Get() State {
return m.State
}
func (m *Manager) QuitAll() {
m.exit = true
m.cond.Broadcast()
}
func (m *Manager) Is(states ...State) bool {
for _, state := range states {
if m.State == state {
return true
}
}
return false
}

8
utils/index.go

@ -0,0 +1,8 @@
package utils
func JoinMap[A string | int | float64, B any](dst, src map[A]B) map[A]B {
for k := range src {
dst[k] = src[k]
}
return dst
}

22
utils/ioplus/reader.go

@ -6,17 +6,13 @@ import (
)
type SReadCloser struct {
r io.ReadCloser
r io.Reader
bytes int
start time.Time
lastCalc time.Time
speed int
}
func (s *SReadCloser) Close() error {
return s.r.Close()
}
func (s *SReadCloser) Read(data []byte) (n int, err error) {
n, err = s.r.Read(data)
s.bytes += n
@ -34,15 +30,15 @@ func (s *SReadCloser) Speed() int {
}
func (s *SReadCloser) Average() float64 {
return float64(s.bytes) / s.start.Sub(time.Now()).Seconds()
return float64(s.bytes) / s.start.Sub(s.lastCalc).Seconds()
}
func NewSReadCloser(r io.ReadCloser) *SReadCloser {
return &SReadCloser{
r,
0,
time.Now(),
time.Now(),
0,
func (s *SReadCloser) Bind(r io.Reader) io.Reader {
if s == nil {
return r
}
s.r = r
s.start = time.Now()
s.lastCalc = time.Now()
return s
}

30
utils/ioplus/writer.go

@ -5,19 +5,15 @@ import (
"time"
)
type SWriteCloser struct {
w io.WriteCloser
type SWrite struct {
w io.Writer
bytes int
start time.Time
lastCalc time.Time
speed int
}
func (s *SWriteCloser) Close() error {
return s.w.Close()
}
func (s *SWriteCloser) Write(data []byte) (n int, err error) {
func (s *SWrite) Write(data []byte) (n int, err error) {
n, err = s.w.Write(data)
s.bytes += n
now := time.Now()
@ -29,20 +25,20 @@ func (s *SWriteCloser) Write(data []byte) (n int, err error) {
return n, err
}
func (s *SWriteCloser) Speed() int {
func (s *SWrite) Speed() int {
return s.speed
}
func (s *SWriteCloser) Average() float64 {
return float64(s.bytes) / s.start.Sub(time.Now()).Seconds()
func (s *SWrite) Average() float64 {
return float64(s.bytes) / s.start.Sub(s.lastCalc).Seconds()
}
func NewSWriteCloser(w io.WriteCloser) *SWriteCloser {
return &SWriteCloser{
w,
0,
time.Now(),
time.Now(),
0,
func (s *SWrite) Bind(w io.Writer) io.Writer {
if s == nil {
return w
}
s.w = w
s.start = time.Now()
s.lastCalc = time.Now()
return s
}

Loading…
Cancel
Save