diff --git a/net/apps/multithread_downloader/control.go b/net/apps/multithread_downloader/control.go index ad9ec32..c687dfb 100644 --- a/net/apps/multithread_downloader/control.go +++ b/net/apps/multithread_downloader/control.go @@ -12,27 +12,71 @@ var ( const ( DefaultMaxThread = 32 - DefaultMaxThreadPerURL = 8 + DefaultMaxThreadPerReq = 8 ) type Client struct { - Client *http.Client - MaxThread uint64 - currentThread uint64 + Transport *http.Transport + distributor *Distributor + tasks []*Downloader } func NewClient(opts ...Option) *Client { client := &Client{ - Client: &http.Client{}, - MaxThread: DefaultMaxThread, - currentThread: 0, + Transport: nil, + distributor: &Distributor{ + running: true, + MaxThread: DefaultMaxThread, + MaxThreadPerReq: DefaultMaxThreadPerReq, + currentThread: 0, + acquire: make(chan uint), + getter: make(chan uint), + release: make(chan uint), + }, + tasks: make([]*Downloader, 0), } for _, opt := range opts { opt(client) } + go client.distributor.routine() return client } -func (d *Client) Download(url *url.URL, file *os.File, opts ...Option) { +// Download 异步提交一个下载任务 +func (c *Client) Download( + URL string, file string, header http.Header, +) (*Downloader, error) { + targetURL, err := url.Parse(URL) + if err != nil { + return nil, err + } + osFile, err := os.OpenFile(file, os.O_CREATE, os.ModePerm) + if err != nil { + return nil, err + } + downloader := createDownloader(c, osFile, targetURL, header) + go downloader.head(c.distributor) + c.tasks = append(c.tasks, downloader) + return downloader, nil +} +func (distri *Distributor) routine() uint { + for distri.running { + select { + case acquire := <-distri.acquire: + switch free := distri.MaxThread - distri.currentThread; { + case free >= acquire: + distri.currentThread += acquire + distri.getter <- acquire + case free <= 0: + release := <-distri.release + distri.getter <- release + default: + distri.currentThread += free + + } + case release := <-distri.release: + + } + } } diff --git a/net/apps/multithread_downloader/distributor.go b/net/apps/multithread_downloader/distributor.go new file mode 100644 index 0000000..55f7925 --- /dev/null +++ b/net/apps/multithread_downloader/distributor.go @@ -0,0 +1,11 @@ +package mdown + +type Distributor struct { + running bool + MaxThread uint + MaxThreadPerReq uint + currentThread uint + release chan uint + acquire chan uint + getter chan uint +} diff --git a/net/apps/multithread_downloader/downloader.go b/net/apps/multithread_downloader/downloader.go new file mode 100644 index 0000000..2ba8c1e --- /dev/null +++ b/net/apps/multithread_downloader/downloader.go @@ -0,0 +1,75 @@ +package mdown + +import ( + `net/http` + `net/url` + `os` + `strconv` +) + +type Downloader struct { + client *http.Client + targetURL *url.URL + file *os.File + headers http.Header + ContentSize int + AcceptRanges bool + State State + threads []*worker +} +type State int + +const ( + FAILED State = iota - 1 + WAITING + DOWNLOADING + SUCCESS +) + +func createDownloader( + client *Client, file *os.File, URL *url.URL, header http.Header, +) *Downloader { + downloader := &Downloader{ + &http.Client{ + Transport: client.Transport, + }, + URL, + file, + header, + 0, + false, + WAITING, + nil, + } + return downloader +} + +func (d *Downloader) head(distri *Distributor) { + headQuery := &http.Request{ + Method: "HEAD", + URL: d.targetURL, + Header: d.headers, + } + resp, err := d.client.Do(headQuery) + if err != nil { + d.State = FAILED + return + } + defer func() { + _ = resp.Body.Close() + }() + contentLength := resp.Header.Get("content-length") + if d.ContentSize, err = strconv.Atoi(contentLength); contentLength == "" || err != nil { + d.State = FAILED + return + } + acceptRanges := resp.Header.Get("accept-ranges") + d.AcceptRanges = acceptRanges == "bytes" + d.State = WAITING + client.acquire(client.) + d.generator() +} + +func (d *Downloader) generator(thread uint) { + +} \ No newline at end of file diff --git a/net/apps/multithread_downloader/option.go b/net/apps/multithread_downloader/option.go index 9ae0b1b..38cbb4f 100644 --- a/net/apps/multithread_downloader/option.go +++ b/net/apps/multithread_downloader/option.go @@ -9,14 +9,26 @@ type Option func(*Client) func WithTransportProxy(proxyURL *url.URL) Option { return func(client *Client) { - client.Client.Transport = &http.Transport{ + client.Transport = &http.Transport{ Proxy: http.ProxyURL(proxyURL), } } } func WithMaxThread(count int) Option { + if count < 0 { + panic("count must be greater than zero") + } + return func(client *Client) { + client.distributor.MaxThread = uint(count) + } +} + +func WithMaxThreadPerReq(count int) Option { + if count <= 0 { + panic("count must be greater than zero") + } return func(client *Client) { - client.MaxThread = uint64(count) + client.distributor.MaxThreadPerReq = uint(count) } }