diff --git a/proc/thread/pool/pool.go b/proc/thread/pool/pool.go new file mode 100644 index 0000000..d0d5402 --- /dev/null +++ b/proc/thread/pool/pool.go @@ -0,0 +1,134 @@ +package pool + +import ( + "sync" + "sync/atomic" + "time" + + "git.realxlfd.cc/RealXLFD/golib/utils/containers/queue" +) + +// Pool 线程池结构 +type Pool struct { + group *sync.WaitGroup // 当前并行任务数量,用于阻塞以等待完成 + current atomic.Int32 // 当前并行线程数量 + state State // 状态 + max int // 最大并行数量 + InQueue *queue.SyncQueue[any] + OutQueue *queue.SyncQueue[any] + Task func(any) any + exit chan bool +} + +// New 创建一个线程池 +func New(max int) *Pool { + if max < 0 { + panic("线程池最大并行数量为正整数!") + } + return &Pool{ + &sync.WaitGroup{}, + atomic.Int32{}, + Ready, + max, + queue.NewSync[any](), + queue.NewSync[any](), + func(any) any { return nil }, + make(chan bool), + } +} + +// State 获取池状态 +func (p *Pool) State() State { + return p.state +} + +// Max 获取池最大并行数量 +func (p *Pool) Max() int { + return p.max +} + +// Current 获取目前并行线程数 +func (p *Pool) Current() int { + return int(p.current.Load()) +} + +// Run 启动线程池,开始执行 +func (p *Pool) Run() *Pool { + // 防止重复调用 + if p.state == Running { + return p + } + p.state = Running + // 启动worker + for i := 0; i < p.max; i++ { + go p.worker() + } + return p +} + +func (p *Pool) worker() { + for p.state != Stopped { + input, exit := p.InQueue.SyncPop() + // 用于强制退出 + if exit { + break + } + p.current.Add(1) + // 启动任务线程 + p.OutQueue.Push(p.Task(input)) + p.current.Add(-1) + p.group.Done() + } +} + +// Join 等待完成,并且停止接受数据 +func (p *Pool) Join() { + // 防止重复调用 + if p.state == Stopping || p.state == Stopped { + return + } + p.InQueue.Exit() + p.state = Stopping + p.group.Wait() + for { + if p.InQueue.Size() != 0 { + time.Sleep(100 * time.Millisecond) + } else { + break + } + } + p.group.Wait() + p.state = Stopped +} + +// Abort 中断线程池 +func (p *Pool) Abort() { + // 防止重复调用 + if p.state == Stopped { + return + } + p.state = Stopped + p.InQueue.Exit() +} + +// Push 用于外界向线程池推送任务 +func (p *Pool) Push(input any) { + if p.state == Stopped || p.state == Stopping { + panic("在一个已经关闭了的线程上发送任务") + } + p.group.Add(1) + p.InQueue.Push(input) +} + +func (p *Pool) GetResult() any { + result, _ := p.OutQueue.SyncPop() + return result +} + +func (p *Pool) GetAllResult() (result []any) { + p.Join() + for p.OutQueue.Size() != 0 { + result = append(result, p.OutQueue.Pop()) + } + return result +} diff --git a/proc/thread/pool/pool_test.go b/proc/thread/pool/pool_test.go new file mode 100644 index 0000000..40856fe --- /dev/null +++ b/proc/thread/pool/pool_test.go @@ -0,0 +1,35 @@ +package pool + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPool(t *testing.T) { + maxWorkers := 5 + pool := New(maxWorkers) + + assert.Equal(t, Ready, pool.State(), "线程池初始化状态应为Ready") + assert.Equal(t, 0, pool.Current(), "线程池初始化时没有运行的goroutine") + + // 设置任务:简单的数学运算 + pool.Task = func(input any) any { + n := input.(int) + return n * n + } + + // 启动线程池 + pool.Run() + + for i := 0; i < 10; i++ { + pool.Push(i) + } + + pool.Join() // 等待足够时间以完成所有任务 + + assert.Equal(t, Stopped, pool.State(), "线程池停止后状态应为Stopped") + + // 检查OutQueue中是否有10个结果 + assert.Equal(t, 10, pool.OutQueue.Size(), "应该有10个处理结果") +} diff --git a/proc/thread/pool/state.go b/proc/thread/pool/state.go new file mode 100644 index 0000000..7bb5a98 --- /dev/null +++ b/proc/thread/pool/state.go @@ -0,0 +1,11 @@ +package pool + +// State 标识线程池状态 +type State int + +const ( + Ready State = iota // 准备状态,池启动前的阶段 + Running // 运行 + Stopping // 停止前的的等待状态,等待剩余任务执行完成 + Stopped // 停止,此时强制暂停后续任务执行,添加则重新载入缓冲区 +) diff --git a/utils/containers/linked_list/builtin.go b/utils/containers/linked_list/builtin.go new file mode 100644 index 0000000..7995424 --- /dev/null +++ b/utils/containers/linked_list/builtin.go @@ -0,0 +1,21 @@ +package llist + +func (s *LinkedList[T]) findPrevNode() *Node[T] { + ptr := s.head + if ptr == nil { + return nil + } + var result *Node[T] = nil + for ptr.next != nil { + result = ptr + ptr = ptr.next + } + return result +} + +func createNode[T any](elem *T) *Node[T] { + return &Node[T]{ + elem: *elem, + next: nil, + } +} diff --git a/utils/containers/linked_list/index.go b/utils/containers/linked_list/index.go new file mode 100644 index 0000000..94612a8 --- /dev/null +++ b/utils/containers/linked_list/index.go @@ -0,0 +1,132 @@ +package llist + +func New[T any]() *LinkedList[T] { + return &LinkedList[T]{ + size: 0, + head: nil, + tail: nil, + } +} + +func (s *LinkedList[T]) Slice() []T { + slice := make([]T, s.size) + ptr := s.head + for index := range s.size { + slice[index] = ptr.elem + ptr = ptr.next + } + return slice +} + +func (s *LinkedList[T]) Push(elem T) *LinkedList[T] { + s.size++ + if s.tail == nil { + node := createNode(&elem) + s.head = node + s.tail = node + return s + } + node := createNode(&elem) + s.tail.next = node + s.tail = node + return s +} + +func (s *LinkedList[T]) Shift() T { + if s.head == nil { + panic("can't Shift() on a null list") + } + s.size-- + result := &s.head.elem + if s.head.next == nil { + s.head = nil + s.tail = nil + return *result + } + s.head = s.head.next + return *result +} + +func (s *LinkedList[T]) Pop() T { + if s.tail == nil { + panic("can't Pop() on a null list") + } + s.size-- + result := &s.tail.elem + prev := s.findPrevNode() + if prev == nil { + s.tail = nil + s.head = nil + return *result + } + prev.next = nil + s.tail = prev + return *result +} + +func (s *LinkedList[T]) Size() int { + return int(s.size) +} + +func (s *LinkedList[T]) Get(index int) T { + if index < 0 || uint(index) >= s.size { + panic("index out of list") + } + ptr := s.head + for range index { + ptr = ptr.next + } + return ptr.elem +} + +func (s *LinkedList[T]) Head() T { + if s.head == nil { + panic("can't Head() on a null list") + } + return s.head.elem +} + +func (s *LinkedList[T]) Add(elem T) *LinkedList[T] { + s.size++ + node := createNode(&elem) + node.next = s.head + if s.head == nil { + s.tail = node + } + s.head = node + return s +} + +func (s *LinkedList[T]) Tail() T { + if s.tail == nil { + panic("can't Tail() on a null list") + } + return s.tail.elem +} + +func (s *LinkedList[T]) Copy() *LinkedList[T] { + newList := new(LinkedList[T]) + newList.size = s.size + ptr := s.head + if ptr == nil { + newList.head = nil + newList.tail = nil + return newList + } + newNode := createNode(&ptr.elem) + newNode.next = ptr.next + newList.head = newNode + newList.tail = newNode + ptr = ptr.next + for ptr != nil { + newNode = createNode(&ptr.elem) + newNode.next = ptr.next + newList.tail = ptr + ptr = ptr.next + } + return newList +} + +func (s *LinkedList[T]) IsNull() bool { + return s.head == nil +} diff --git a/utils/containers/queue/queue.go b/utils/containers/queue/queue.go new file mode 100644 index 0000000..63042af --- /dev/null +++ b/utils/containers/queue/queue.go @@ -0,0 +1,57 @@ +package queue + +const ( + InitialCapacity = 10 + InitialEndure +) + +type Queue[T any] struct { + buf []T + endure int + head int +} + +func (q *Queue[T]) Size() int { + return len(q.buf) - q.head +} + +func (q *Queue[T]) Push(elem ...T) { + q.buf = append(q.buf, elem...) +} + +func (q *Queue[T]) Pop() T { + if len(q.buf) == q.head || len(q.buf) == 0 { + panic("the queue is null") + } + result := q.buf[q.head] + q.head++ + if q.head >= q.endure { + q.relocate() + } + return result +} + +func (q *Queue[T]) Head() T { + if len(q.buf) == q.head || len(q.buf) == 0 { + panic("the queue is null") + } + return q.buf[q.head] +} + +// 0 1 2 3 +// 1 2 3 +func (q *Queue[T]) relocate() { + q.endure = len(q.buf) / 2 + newData := make([]T, len(q.buf)-q.head) + copy(newData, q.buf[q.head:]) + q.head = 0 + q.buf = newData +} + +func New[T any]() *Queue[T] { + return &Queue[T]{ + make([]T, 0, InitialCapacity), + InitialEndure, + 0, + } +} diff --git a/utils/containers/queue/queue_test.go b/utils/containers/queue/queue_test.go new file mode 100644 index 0000000..c0936b5 --- /dev/null +++ b/utils/containers/queue/queue_test.go @@ -0,0 +1,88 @@ +package queue + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestQueuePushAndSize(t *testing.T) { + q := New[int]() + q.Push(1, 2, 3) + assert.Equal( + t, + 3, + q.Size(), + "Queue size should be 3 after pushing 3 elements", + ) +} + +func TestQueuePop(t *testing.T) { + q := New[int]() + q.Push(1, 2, 3) + pop1 := q.Pop() + assert.Equal(t, 1, pop1, "Expected to pop 1 as the first element") + assert.Equal(t, 2, q.Size(), "Queue size should be 2 after one pop") + pop2 := q.Pop() + assert.Equal(t, 2, pop2, "Expected to pop 2 as the second element") +} + +func TestQueueHead(t *testing.T) { + q := New[int]() + q.Push(1, 2, 3) + head := q.Head() + assert.Equal(t, 1, head, "Expected head to be 1") +} + +func TestQueuePopEmpty(t *testing.T) { + q := New[int]() + assert.Panics( + t, + func() { q.Pop() }, + "Expected Pop to panic on empty queue", + ) +} + +func TestQueueHeadEmpty(t *testing.T) { + q := New[int]() + assert.Panics( + t, + func() { q.Head() }, + "Expected Head to panic on empty queue", + ) +} + +// 性能测试 +func BenchmarkQueuePush(b *testing.B) { + q := New[int]() + for i := 0; i < b.N; i++ { + q.Push(i) + } +} + +func BenchmarkQueuePop(b *testing.B) { + q := New[int]() + for i := 0; i < b.N; i++ { + q.Push(i) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.Pop() + } +} + +func BenchmarkQueue_All(b *testing.B) { + q := New[int]() + for i := 0; i < b.N; i++ { + q.Push(i) + } + for i := 0; i < b.N/3*2; i++ { + q.Pop() + } + for i := 0; i < b.N/2; i++ { + q.Push(i) + } + for i := 0; i < b.N/2; i++ { + q.Pop() + } +} diff --git a/utils/containers/queue/sync_queue.go b/utils/containers/queue/sync_queue.go new file mode 100644 index 0000000..af541d4 --- /dev/null +++ b/utils/containers/queue/sync_queue.go @@ -0,0 +1,75 @@ +package queue + +import ( + "sync" +) + +type SyncQueue[T any] struct { + lock *sync.Mutex + cond *sync.Cond + exit bool + queue *Queue[T] +} + +func NewSync[T any]() *SyncQueue[T] { + queue := New[T]() + lock := &sync.Mutex{} + return &SyncQueue[T]{ + lock, + sync.NewCond(lock), + false, + queue, + } +} +func (s *SyncQueue[T]) Size() int { + return s.queue.Size() +} + +func (s *SyncQueue[T]) Push(elems ...T) { + s.lock.Lock() + defer s.lock.Unlock() + s.queue.Push(elems...) + s.cond.Signal() +} +func (s *SyncQueue[T]) Exit() { + s.lock.Lock() + defer s.lock.Unlock() + s.cond.Broadcast() +} +func (s *SyncQueue[T]) SyncPop() (T, bool) { + s.lock.Lock() + defer s.lock.Unlock() + for s.Size() == 0 { + s.cond.Wait() + if s.exit { + var t T + return t, true + } + } + return s.queue.Pop(), false +} + +func (s *SyncQueue[T]) Pop() T { + s.lock.Lock() + defer s.lock.Unlock() + return s.queue.Pop() +} + +func (s *SyncQueue[T]) SyncHead() (T, bool) { + s.lock.Lock() + defer s.lock.Unlock() + for s.Size() == 0 { + s.cond.Wait() + if s.exit { + var t T + return t, true + } + } + return s.queue.Head(), false +} + +func (s *SyncQueue[T]) Head() T { + s.lock.Lock() + defer s.lock.Unlock() + return s.queue.Head() +} diff --git a/utils/containers/queue/sync_queue_test.go b/utils/containers/queue/sync_queue_test.go new file mode 100644 index 0000000..a091861 --- /dev/null +++ b/utils/containers/queue/sync_queue_test.go @@ -0,0 +1,87 @@ +package queue + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestSyncQueue(t *testing.T) { + q := NewSync[int]() + assert.Equal(t, 0, q.Size()) + + q.Push(1, 2, 3, 4) + assert.Equal(t, 4, q.Size()) + + head := q.Head() + assert.Equal(t, 1, head) + + popped := q.Pop() + assert.Equal(t, 1, popped) + assert.Equal(t, 3, q.Size()) +} + +func TestSyncQueueConcurrent(t *testing.T) { + q := NewSync[int]() + wg := sync.WaitGroup{} + wg.Add(2) + + // Producer + go func() { + defer wg.Done() + q.Push(1) + time.Sleep(100 * time.Millisecond) + q.Push(2) + }() + + // Consumer + go func() { + defer wg.Done() + first, _ := q.SyncPop() + assert.Equal(t, 1, first) + + second, _ := q.SyncPop() + assert.Equal(t, 2, second) + }() + + wg.Wait() +} + +func BenchmarkSyncQueuePushPop(b *testing.B) { + q := NewSync[int]() + wg := sync.WaitGroup{} + wg.Add(2) + + b.RunParallel( + func(pb *testing.PB) { + for pb.Next() { + wg.Add(1) + go func() { + defer wg.Done() + q.Push(1) + _ = q.Pop() + }() + } + }, + ) + + wg.Wait() +} + +func BenchmarkSyncQueueSyncPop(b *testing.B) { + q := NewSync[int]() + for i := 0; i < b.N; i++ { + q.Push(i) + } + b.ResetTimer() + + b.RunParallel( + func(pb *testing.PB) { + for pb.Next() { + _, _ = q.SyncPop() + } + }, + ) +} diff --git a/utils/containers/stack/stack.go b/utils/containers/stack/stack.go new file mode 100644 index 0000000..c5c712f --- /dev/null +++ b/utils/containers/stack/stack.go @@ -0,0 +1,44 @@ +package stack + +const ( + InitialStackCapacity = 10 +) + +type Stack[T any] struct { + data []T + size int +} + +func New[T any]() *Stack[T] { + return &Stack[T]{ + data: make([]T, 0, InitialStackCapacity), + size: 0, + } +} + +func (s *Stack[T]) Push(elems ...T) *Stack[T] { + s.data = append(s.data, elems...) + s.size += len(elems) + return s +} + +func (s *Stack[T]) Pop() T { + if s.size == 0 { + panic("the stack is null") + } + s.size-- + result := s.data[s.size] + s.data = s.data[:s.size] + return result +} + +func (s *Stack[T]) Top() T { + if s.size == 0 { + panic("the stack is null") + } + return s.data[s.size-1] +} + +func (s *Stack[T]) Size() int { + return s.size +} diff --git a/utils/containers/stack/stack_test.go b/utils/containers/stack/stack_test.go new file mode 100644 index 0000000..f6ccbd6 --- /dev/null +++ b/utils/containers/stack/stack_test.go @@ -0,0 +1,78 @@ +package stack + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestStack_PushPop(t *testing.T) { + s := New[int]() + s.Push(1) + assert.Equal( + t, + 1, + s.Size(), + "The size should be 1 after pushing an element", + ) + + popped := s.Pop() + assert.Equal( + t, + 1, + popped, + "The popped element should be equal to the last pushed element", + ) + assert.Equal( + t, + 0, + s.Size(), + "The size should be 0 after popping the last element", + ) +} + +func TestStack_Top(t *testing.T) { + s := New[string]() + s.Push("hello") + top := s.Top() + assert.Equal(t, "hello", top, "The top element should be 'hello'") +} + +func TestStack_PopEmpty(t *testing.T) { + s := New[float64]() + s.Push(0.1, 0.2) + s.Pop() + s.Pop() + assert.Panics( + t, + func() { s.Pop() }, + "Popping from an empty stack should panic", + ) +} + +func TestStack_TopEmpty(t *testing.T) { + s := New[bool]() + assert.Panics( + t, + func() { s.Top() }, + "Calling Top on an empty stack should panic", + ) +} + +func BenchmarkStack_PushPop(b *testing.B) { + s := New[int]() + for i := 0; i < b.N; i++ { + s.Push(i) + } + for i := 0; i < b.N; i++ { + _ = s.Pop() + } +} + +func BenchmarkStack_Top(b *testing.B) { + s := New[int]() + s.Push(1) + for i := 0; i < b.N; i++ { + _ = s.Top() + } +}