Browse Source

fix fscan

master
RealXLFD 1 month ago
parent
commit
58adde8da0
  1. 25
      proc/thread/pool/parallel.go
  2. 90
      proc/thread/pool/pool.go
  3. 1
      proc/thread/pool/state.go
  4. 1
      utils/containers/queue/sync_queue.go

25
proc/thread/pool/parallel.go

@ -0,0 +1,25 @@
package pool
type Runner struct {
pool *Pool[[]any, any]
}
func RunParallel(handler func([]any) any, max int) *Runner {
runner := &Runner{
pool: New[[]any, any](max, handler).Run(),
}
return runner
}
func (r *Runner) Run(args ...any) {
r.pool.Push(args)
return
}
func (r *Runner) Join() {
r.pool.Join()
return
}
func (r *Runner) Results() (results []any) {
return r.pool.GetAllResult()
}

90
proc/thread/pool/pool.go

@ -1,6 +1,7 @@
package pool
import (
"runtime"
"sync"
"sync/atomic"
@ -9,29 +10,33 @@ import (
// Pool 协程池结构
type Pool[I any, O any] struct {
group sync.WaitGroup // 当前并行任务数量,用于阻塞以等待完成
current atomic.Int32 // 当前并行协程数量
state State // 状态
max int // 最大并行数量
group *sync.WaitGroup // 当前并行任务数量,用于阻塞以等待完成
current atomic.Int32 // 当前并行协程数量
state State // 状态
max int // 最大并行数量
inQueue *queue.SyncQueue[I]
OutQueue *queue.SyncQueue[O]
Task func(I) O
workers []*bool
lock *sync.Mutex
pause *sync.WaitGroup
}
// New 创建一个协程池
func New[I any, O any](max int, task func(I) O) *Pool[I, O] {
if max < 0 {
panic("协程池最大并行数量为正整数!")
}
return &Pool[I, O]{
sync.WaitGroup{},
pool := &Pool[I, O]{
&sync.WaitGroup{},
atomic.Int32{},
Ready,
max,
queue.NewSync[I](),
queue.NewSync[O](),
task,
make([]*bool, 0, runtime.NumCPU()),
&sync.Mutex{},
&sync.WaitGroup{},
}
return pool
}
// State 获取池状态
@ -47,6 +52,28 @@ func (p *Pool[I, O]) Max() int {
// Current 获取目前并行协程数
func (p *Pool[I, O]) Current() int {
return int(p.current.Load())
}
func (p *Pool[I, O]) Pause() *Pool[I, O] {
if p.state != Running {
return p
}
p.lock.Lock()
defer p.lock.Unlock()
p.state = Paused
for _, cur := range p.workers {
*cur = false
}
return p
}
func (p *Pool[I, O]) Recover() *Pool[I, O] {
if p.state != Paused {
return p
}
p.lock.Lock()
defer p.lock.Unlock()
for _, cur :=
}
// Run 启动协程池,开始执行
@ -55,16 +82,26 @@ func (p *Pool[I, O]) Run() *Pool[I, O] {
if p.state == Running {
return p
}
if p.state == Paused {
p.Recover()
}
p.state = Running
// 启动worker
for i := 0; i < p.max; i++ {
go p.worker()
var launchCount int
if p.max > 0 {
launchCount = p.max
} else {
launchCount = runtime.NumCPU()/2 + 1
}
for range launchCount {
cur := true
p.workers = append(p.workers, &cur)
go worker(p, &cur)
}
return p
}
func (p *Pool[I, O]) worker() {
for p.state != Stopped {
func worker[I any, O any](p *Pool[I, O], cur *bool) {
for p.state != Stopped && *cur {
input, exit := p.inQueue.SyncPop()
// 用于强制退出
if exit {
@ -84,6 +121,9 @@ func (p *Pool[I, O]) Join() {
if p.state == Stopping || p.state == Stopped {
return
}
if p.state == Paused {
// TODO
}
p.state = Stopping
p.group.Wait()
p.inQueue.Close()
@ -97,14 +137,36 @@ func (p *Pool[I, O]) Abort() {
return
}
p.state = Stopped
p.workers = make([]*bool, 0)
p.inQueue.Close()
}
func (p *Pool[I, O]) Shrink() {
p.lock.Lock()
defer p.lock.Unlock()
if p.inQueue.Size() != 0 && len(p.workers) > p.inQueue.Size() {
delNum := len(p.workers) - p.inQueue.Size()
for _, cur := range p.workers[:delNum] {
*cur = false
}
p.workers = p.workers[delNum:]
}
}
// Push 用于外界向协程池推送任务
func (p *Pool[I, O]) Push(inputs ...I) {
if p.state == Stopped || p.state == Stopping {
panic("推送任务至一个已经关闭了的池")
}
p.lock.Lock()
if p.max <= 0 && len(p.workers) < p.inQueue.Size() {
for range p.inQueue.Size() - len(p.workers) {
cur := true
p.workers = append(p.workers, &cur)
go worker(p, &cur)
}
}
p.lock.Unlock()
p.group.Add(len(inputs))
for i := range inputs {
p.inQueue.Push(inputs[i])

1
proc/thread/pool/state.go

@ -8,4 +8,5 @@ const (
Running // 运行
Stopping // 停止前的的等待状态,等待剩余任务执行完成
Stopped // 停止,此时强制暂停后续任务执行,添加则重新载入缓冲区
Paused
)

1
utils/containers/queue/sync_queue.go

@ -39,6 +39,7 @@ func (s *SyncQueue[T]) Close() {
s.cond.Broadcast()
s.lock.Unlock()
}
func (s *SyncQueue[T]) SyncPop() (T, bool) {
s.lock.Lock()
defer s.lock.Unlock()

Loading…
Cancel
Save