realxlfd
4 weeks ago
17 changed files with 434 additions and 310 deletions
@ -1,11 +0,0 @@ |
|||
package mdown |
|||
|
|||
type Distributor struct { |
|||
running bool |
|||
MaxThread uint |
|||
MaxThreadPerReq uint |
|||
currentThread uint |
|||
release chan uint |
|||
acquire chan uint |
|||
getter chan uint |
|||
} |
@ -0,0 +1,67 @@ |
|||
package distri |
|||
|
|||
import ( |
|||
"sync" |
|||
) |
|||
|
|||
type Distributor struct { |
|||
running bool |
|||
MaxThread int |
|||
free int |
|||
lock sync.Mutex |
|||
cond sync.Cond |
|||
} |
|||
|
|||
func New(max int) *Distributor { |
|||
return &Distributor{ |
|||
false, |
|||
max, max, |
|||
sync.Mutex{}, |
|||
sync.Cond{}, |
|||
} |
|||
} |
|||
func (d *Distributor) Run() *Distributor { |
|||
d.running = true |
|||
return d |
|||
} |
|||
func (d *Distributor) Acquire(count int) int { |
|||
d.lock.Lock() |
|||
if !d.running { |
|||
return 0 |
|||
} |
|||
defer d.lock.Unlock() |
|||
for d.free <= 0 { |
|||
d.cond.Wait() |
|||
if !d.running { |
|||
return 0 |
|||
} |
|||
} |
|||
var acquired int |
|||
switch { |
|||
case d.free <= count: |
|||
acquired = d.free |
|||
d.free = 0 |
|||
case d.free > count: |
|||
d.free -= count |
|||
acquired = count |
|||
} |
|||
return acquired |
|||
} |
|||
|
|||
func (d *Distributor) Release(count int) { |
|||
d.lock.Lock() |
|||
if !d.running { |
|||
return |
|||
} |
|||
defer d.lock.Unlock() |
|||
d.free += count |
|||
d.cond.Signal() |
|||
} |
|||
|
|||
func (d *Distributor) Close() { |
|||
if !d.running { |
|||
return |
|||
} |
|||
d.running = false |
|||
d.cond.Broadcast() |
|||
} |
@ -0,0 +1,108 @@ |
|||
package distri_test |
|||
|
|||
import ( |
|||
"sync" |
|||
"testing" |
|||
"time" |
|||
|
|||
distri "git.realxlfd.cc/RealXLFD/golib/proc/thread/distributor" |
|||
"github.com/stretchr/testify/assert" |
|||
) |
|||
|
|||
func TestDistributor_AcquireAndRelease(t *testing.T) { |
|||
maxThreads := 10 |
|||
d := distri.New(maxThreads).Run() |
|||
|
|||
// Acquire less than max
|
|||
acquired := d.Acquire(5) |
|||
assert.Equal(t, 5, acquired, "Should acquire 5 threads") |
|||
|
|||
// Release and then acquire again
|
|||
d.Release(5) |
|||
acquired = d.Acquire(5) |
|||
assert.Equal( |
|||
t, |
|||
5, |
|||
acquired, |
|||
"Should re-acquire 5 threads after release", |
|||
) |
|||
|
|||
// Try to acquire more than available
|
|||
acquired = d.Acquire(10) |
|||
assert.Equal( |
|||
t, |
|||
5, |
|||
acquired, |
|||
"Should only acquire 5 threads because that's what's left", |
|||
) |
|||
|
|||
d.Release(5) |
|||
} |
|||
|
|||
func TestDistributor_Close(t *testing.T) { |
|||
d := distri.New(10).Run() |
|||
|
|||
var wg sync.WaitGroup |
|||
wg.Add(1) |
|||
|
|||
go func() { |
|||
defer wg.Done() |
|||
acquired := d.Acquire(1) |
|||
assert.Equal( |
|||
t, |
|||
1, |
|||
acquired, |
|||
"Should acquire 0 threads after close", |
|||
) |
|||
}() |
|||
|
|||
time.Sleep(time.Millisecond * 100) // Ensuring goroutine calls Acquire before Close
|
|||
d.Close() |
|||
wg.Wait() |
|||
} |
|||
|
|||
func TestDistributor_Concurrency(t *testing.T) { |
|||
d := distri.New(100).Run() |
|||
var wg sync.WaitGroup |
|||
|
|||
acquireAndRelease := func(count int) { |
|||
defer wg.Done() |
|||
acquired := d.Acquire(count) |
|||
assert.Equal( |
|||
t, |
|||
count, |
|||
acquired, |
|||
"Should acquire the exact count of threads", |
|||
) |
|||
time.Sleep(time.Millisecond * 10) // Simulate work
|
|||
d.Release(count) |
|||
} |
|||
|
|||
totalThreads := 100 |
|||
for i := 0; i < 10; i++ { |
|||
wg.Add(1) |
|||
go acquireAndRelease(totalThreads / 10) |
|||
} |
|||
|
|||
wg.Wait() |
|||
|
|||
// Check if all threads are released properly
|
|||
acquired := d.Acquire(totalThreads) |
|||
assert.Equal( |
|||
t, |
|||
totalThreads, |
|||
acquired, |
|||
"Should be able to acquire all threads after concurrent operations", |
|||
) |
|||
d.Release(totalThreads) |
|||
} |
|||
|
|||
func BenchmarkDistributor_AcquireRelease(b *testing.B) { |
|||
d := distri.New(100).Run() |
|||
b.ResetTimer() |
|||
|
|||
for i := 0; i < b.N; i++ { |
|||
d.Acquire(1) |
|||
d.Release(1) |
|||
} |
|||
} |
@ -0,0 +1,114 @@ |
|||
package pool |
|||
|
|||
import ( |
|||
"sync" |
|||
"sync/atomic" |
|||
) |
|||
|
|||
// ClosurePool 线程池结构
|
|||
type ClosurePool struct { |
|||
group sync.WaitGroup // 当前并行任务数量,用于阻塞以等待完成
|
|||
current atomic.Int32 // 当前并行线程数量
|
|||
state State // 状态
|
|||
max int // 最大并行数量
|
|||
queue chan func() // 缓冲区,用于允许池启动前先填充任务
|
|||
exit chan bool |
|||
} |
|||
|
|||
// NewClosure 创建一个闭包线程池
|
|||
func NewClosure(max int, cache int) *ClosurePool { |
|||
if max < 0 || cache < 0 { |
|||
panic("线程池最大并行数量为正整数!") |
|||
} |
|||
return &ClosurePool{ |
|||
group: sync.WaitGroup{}, |
|||
current: atomic.Int32{}, |
|||
state: Ready, |
|||
max: max, |
|||
queue: make(chan func(), cache), |
|||
exit: make(chan bool), |
|||
} |
|||
} |
|||
|
|||
// State 获取池状态
|
|||
func (p *ClosurePool) State() State { |
|||
return p.state |
|||
} |
|||
|
|||
// Max 获取池最大并行数量
|
|||
func (p *ClosurePool) Max() int { |
|||
return p.max |
|||
} |
|||
|
|||
// Current 获取目前并行线程数
|
|||
func (p *ClosurePool) Current() int { |
|||
return int(p.current.Load()) |
|||
} |
|||
|
|||
// Run 启动线程池,开始执行
|
|||
func (p *ClosurePool) Run() *ClosurePool { |
|||
// 防止重复调用
|
|||
if p.state == Running { |
|||
return p |
|||
} |
|||
p.state = Running |
|||
// 启动worker
|
|||
for i := 0; i < p.max; i++ { |
|||
go p.worker() |
|||
} |
|||
return p |
|||
} |
|||
func (p *ClosurePool) worker() { |
|||
for p.state != Stopped { |
|||
task, ok := <-p.queue |
|||
// 用于强制退出
|
|||
if !ok { |
|||
break |
|||
} |
|||
p.current.Add(1) |
|||
// 启动任务线程
|
|||
task() |
|||
p.current.Add(-1) |
|||
p.group.Done() |
|||
} |
|||
} |
|||
|
|||
// Join 等待完成,并且停止接受数据
|
|||
func (p *ClosurePool) Join() { |
|||
// 防止重复调用
|
|||
if p.state == Stopping || p.state == Stopped { |
|||
return |
|||
} |
|||
close(p.queue) |
|||
p.state = Stopping |
|||
p.group.Wait() |
|||
p.state = Stopped |
|||
} |
|||
|
|||
// Abort 中断线程池
|
|||
func (p *ClosurePool) Abort() { |
|||
// 防止重复调用
|
|||
if p.state == Stopped { |
|||
return |
|||
} |
|||
p.state = Stopped |
|||
close(p.queue) |
|||
} |
|||
|
|||
// Push 用于外界向线程池推送任务
|
|||
func (p *ClosurePool) Push(f func()) { |
|||
if p.state == Stopped || p.state == Stopping { |
|||
panic("在一个已经关闭了的线程上发送任务") |
|||
} |
|||
p.add(f) |
|||
return |
|||
} |
|||
func (p *ClosurePool) Queued() int { |
|||
return len(p.queue) |
|||
} |
|||
|
|||
// add 添加任务的具体实现
|
|||
func (p *ClosurePool) add(f func()) { |
|||
p.group.Add(1) |
|||
p.queue <- f |
|||
} |
@ -1,145 +0,0 @@ |
|||
package threadpool |
|||
|
|||
import ( |
|||
"sync" |
|||
"sync/atomic" |
|||
"time" |
|||
) |
|||
|
|||
// Pool 线程池结构
|
|||
type Pool struct { |
|||
group sync.WaitGroup // 当前并行任务数量,用于阻塞以等待完成
|
|||
current int32 // 当前并行线程数量
|
|||
state State // 状态
|
|||
max int32 // 最大并行数量
|
|||
queueCurrent int32 // 缓冲队列任务数量
|
|||
queue chan func() // 缓冲区,用于允许池启动前先填充任务
|
|||
exit chan bool |
|||
} |
|||
|
|||
// State 标识线程池状态
|
|||
type State int |
|||
|
|||
const ( |
|||
Ready = iota // 准备状态,池启动前的阶段
|
|||
Running // 运行
|
|||
Stopping // 停止前的的等待状态,等待剩余任务执行完成
|
|||
Stopped // 停止,此时强制暂停后续任务执行,添加则重新载入缓冲区
|
|||
) |
|||
|
|||
// New 创建一个线程池
|
|||
func New(max int, cache int) *Pool { |
|||
if max < 0 || cache < 0 { |
|||
panic("线程池最大并行数量为正整数!") |
|||
} |
|||
return &Pool{ |
|||
group: sync.WaitGroup{}, |
|||
current: 0, |
|||
state: Ready, |
|||
max: int32(max), |
|||
queueCurrent: 0, |
|||
queue: make( |
|||
chan func(), |
|||
cache, |
|||
), |
|||
exit: make(chan bool), |
|||
} |
|||
} |
|||
|
|||
// State 获取池状态
|
|||
func (p *Pool) State() State { |
|||
return p.state |
|||
} |
|||
|
|||
// Max 获取池最大并行数量
|
|||
func (p *Pool) Max() int { |
|||
return int(p.max) |
|||
} |
|||
|
|||
// Current 获取目前并行线程数
|
|||
func (p *Pool) Current() int { |
|||
return int(p.current) |
|||
} |
|||
|
|||
// Run 启动线程池,开始执行
|
|||
func (p *Pool) Run() *Pool { |
|||
// 防止重复调用
|
|||
if p.state == Running { |
|||
return p |
|||
} |
|||
p.state = Running |
|||
var worker = func() { |
|||
for p.state != Stopped { |
|||
task, ok := <-p.queue |
|||
// 用于强制退出
|
|||
if !ok { |
|||
break |
|||
} |
|||
atomic.AddInt32( |
|||
&p.current, |
|||
1, |
|||
) |
|||
// 启动任务线程
|
|||
task() |
|||
atomic.AddInt32( |
|||
&p.current, |
|||
-1, |
|||
) |
|||
p.group.Done() |
|||
} |
|||
} |
|||
// 启动worker
|
|||
for i := 0; i < int(p.max); i++ { |
|||
go worker() |
|||
} |
|||
return p |
|||
} |
|||
|
|||
// Join 等待完成,并且停止接受数据
|
|||
func (p *Pool) Join() { |
|||
// 防止重复调用
|
|||
if p.state == Stopping || p.state == Stopped { |
|||
return |
|||
} |
|||
close(p.queue) |
|||
p.state = Stopping |
|||
for { |
|||
if len(p.queue) != 0 { |
|||
time.Sleep(120 * time.Millisecond) |
|||
} else { |
|||
break |
|||
} |
|||
} |
|||
p.group.Wait() |
|||
p.state = Stopped |
|||
} |
|||
|
|||
// Abort 中断线程池
|
|||
func (p *Pool) Abort() { |
|||
// 防止重复调用
|
|||
if p.state == Stopped { |
|||
return |
|||
} |
|||
p.state = Stopped |
|||
close(p.queue) |
|||
} |
|||
|
|||
// Push 用于外界向线程池推送任务
|
|||
func (p *Pool) Push(f func()) { |
|||
if p.state == Stopped || p.state == Stopping { |
|||
panic("在一个已经关闭了的线程上发送任务") |
|||
} |
|||
p.add(f) |
|||
return |
|||
} |
|||
|
|||
// Private
|
|||
// add 添加任务的具体实现
|
|||
func (p *Pool) add(f func()) { |
|||
p.queue <- f |
|||
p.group.Add(1) |
|||
atomic.AddInt32( |
|||
&p.queueCurrent, |
|||
1, |
|||
) |
|||
} |
Loading…
Reference in new issue