Browse Source

add a lot

master
realxlfd 2 months ago
parent
commit
73fd164275
  1. 134
      proc/thread/pool/pool.go
  2. 35
      proc/thread/pool/pool_test.go
  3. 11
      proc/thread/pool/state.go
  4. 21
      utils/containers/linked_list/builtin.go
  5. 132
      utils/containers/linked_list/index.go
  6. 57
      utils/containers/queue/queue.go
  7. 88
      utils/containers/queue/queue_test.go
  8. 75
      utils/containers/queue/sync_queue.go
  9. 87
      utils/containers/queue/sync_queue_test.go
  10. 44
      utils/containers/stack/stack.go
  11. 78
      utils/containers/stack/stack_test.go

134
proc/thread/pool/pool.go

@ -0,0 +1,134 @@
package pool
import (
"sync"
"sync/atomic"
"time"
"git.realxlfd.cc/RealXLFD/golib/utils/containers/queue"
)
// Pool 线程池结构
type Pool struct {
group *sync.WaitGroup // 当前并行任务数量,用于阻塞以等待完成
current atomic.Int32 // 当前并行线程数量
state State // 状态
max int // 最大并行数量
InQueue *queue.SyncQueue[any]
OutQueue *queue.SyncQueue[any]
Task func(any) any
exit chan bool
}
// New 创建一个线程池
func New(max int) *Pool {
if max < 0 {
panic("线程池最大并行数量为正整数!")
}
return &Pool{
&sync.WaitGroup{},
atomic.Int32{},
Ready,
max,
queue.NewSync[any](),
queue.NewSync[any](),
func(any) any { return nil },
make(chan bool),
}
}
// State 获取池状态
func (p *Pool) State() State {
return p.state
}
// Max 获取池最大并行数量
func (p *Pool) Max() int {
return p.max
}
// Current 获取目前并行线程数
func (p *Pool) Current() int {
return int(p.current.Load())
}
// Run 启动线程池,开始执行
func (p *Pool) Run() *Pool {
// 防止重复调用
if p.state == Running {
return p
}
p.state = Running
// 启动worker
for i := 0; i < p.max; i++ {
go p.worker()
}
return p
}
func (p *Pool) worker() {
for p.state != Stopped {
input, exit := p.InQueue.SyncPop()
// 用于强制退出
if exit {
break
}
p.current.Add(1)
// 启动任务线程
p.OutQueue.Push(p.Task(input))
p.current.Add(-1)
p.group.Done()
}
}
// Join 等待完成,并且停止接受数据
func (p *Pool) Join() {
// 防止重复调用
if p.state == Stopping || p.state == Stopped {
return
}
p.InQueue.Exit()
p.state = Stopping
p.group.Wait()
for {
if p.InQueue.Size() != 0 {
time.Sleep(100 * time.Millisecond)
} else {
break
}
}
p.group.Wait()
p.state = Stopped
}
// Abort 中断线程池
func (p *Pool) Abort() {
// 防止重复调用
if p.state == Stopped {
return
}
p.state = Stopped
p.InQueue.Exit()
}
// Push 用于外界向线程池推送任务
func (p *Pool) Push(input any) {
if p.state == Stopped || p.state == Stopping {
panic("在一个已经关闭了的线程上发送任务")
}
p.group.Add(1)
p.InQueue.Push(input)
}
func (p *Pool) GetResult() any {
result, _ := p.OutQueue.SyncPop()
return result
}
func (p *Pool) GetAllResult() (result []any) {
p.Join()
for p.OutQueue.Size() != 0 {
result = append(result, p.OutQueue.Pop())
}
return result
}

35
proc/thread/pool/pool_test.go

@ -0,0 +1,35 @@
package pool
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestPool(t *testing.T) {
maxWorkers := 5
pool := New(maxWorkers)
assert.Equal(t, Ready, pool.State(), "线程池初始化状态应为Ready")
assert.Equal(t, 0, pool.Current(), "线程池初始化时没有运行的goroutine")
// 设置任务:简单的数学运算
pool.Task = func(input any) any {
n := input.(int)
return n * n
}
// 启动线程池
pool.Run()
for i := 0; i < 10; i++ {
pool.Push(i)
}
pool.Join() // 等待足够时间以完成所有任务
assert.Equal(t, Stopped, pool.State(), "线程池停止后状态应为Stopped")
// 检查OutQueue中是否有10个结果
assert.Equal(t, 10, pool.OutQueue.Size(), "应该有10个处理结果")
}

11
proc/thread/pool/state.go

@ -0,0 +1,11 @@
package pool
// State 标识线程池状态
type State int
const (
Ready State = iota // 准备状态,池启动前的阶段
Running // 运行
Stopping // 停止前的的等待状态,等待剩余任务执行完成
Stopped // 停止,此时强制暂停后续任务执行,添加则重新载入缓冲区
)

21
utils/containers/linked_list/builtin.go

@ -0,0 +1,21 @@
package llist
func (s *LinkedList[T]) findPrevNode() *Node[T] {
ptr := s.head
if ptr == nil {
return nil
}
var result *Node[T] = nil
for ptr.next != nil {
result = ptr
ptr = ptr.next
}
return result
}
func createNode[T any](elem *T) *Node[T] {
return &Node[T]{
elem: *elem,
next: nil,
}
}

132
utils/containers/linked_list/index.go

@ -0,0 +1,132 @@
package llist
func New[T any]() *LinkedList[T] {
return &LinkedList[T]{
size: 0,
head: nil,
tail: nil,
}
}
func (s *LinkedList[T]) Slice() []T {
slice := make([]T, s.size)
ptr := s.head
for index := range s.size {
slice[index] = ptr.elem
ptr = ptr.next
}
return slice
}
func (s *LinkedList[T]) Push(elem T) *LinkedList[T] {
s.size++
if s.tail == nil {
node := createNode(&elem)
s.head = node
s.tail = node
return s
}
node := createNode(&elem)
s.tail.next = node
s.tail = node
return s
}
func (s *LinkedList[T]) Shift() T {
if s.head == nil {
panic("can't Shift() on a null list")
}
s.size--
result := &s.head.elem
if s.head.next == nil {
s.head = nil
s.tail = nil
return *result
}
s.head = s.head.next
return *result
}
func (s *LinkedList[T]) Pop() T {
if s.tail == nil {
panic("can't Pop() on a null list")
}
s.size--
result := &s.tail.elem
prev := s.findPrevNode()
if prev == nil {
s.tail = nil
s.head = nil
return *result
}
prev.next = nil
s.tail = prev
return *result
}
func (s *LinkedList[T]) Size() int {
return int(s.size)
}
func (s *LinkedList[T]) Get(index int) T {
if index < 0 || uint(index) >= s.size {
panic("index out of list")
}
ptr := s.head
for range index {
ptr = ptr.next
}
return ptr.elem
}
func (s *LinkedList[T]) Head() T {
if s.head == nil {
panic("can't Head() on a null list")
}
return s.head.elem
}
func (s *LinkedList[T]) Add(elem T) *LinkedList[T] {
s.size++
node := createNode(&elem)
node.next = s.head
if s.head == nil {
s.tail = node
}
s.head = node
return s
}
func (s *LinkedList[T]) Tail() T {
if s.tail == nil {
panic("can't Tail() on a null list")
}
return s.tail.elem
}
func (s *LinkedList[T]) Copy() *LinkedList[T] {
newList := new(LinkedList[T])
newList.size = s.size
ptr := s.head
if ptr == nil {
newList.head = nil
newList.tail = nil
return newList
}
newNode := createNode(&ptr.elem)
newNode.next = ptr.next
newList.head = newNode
newList.tail = newNode
ptr = ptr.next
for ptr != nil {
newNode = createNode(&ptr.elem)
newNode.next = ptr.next
newList.tail = ptr
ptr = ptr.next
}
return newList
}
func (s *LinkedList[T]) IsNull() bool {
return s.head == nil
}

57
utils/containers/queue/queue.go

@ -0,0 +1,57 @@
package queue
const (
InitialCapacity = 10
InitialEndure
)
type Queue[T any] struct {
buf []T
endure int
head int
}
func (q *Queue[T]) Size() int {
return len(q.buf) - q.head
}
func (q *Queue[T]) Push(elem ...T) {
q.buf = append(q.buf, elem...)
}
func (q *Queue[T]) Pop() T {
if len(q.buf) == q.head || len(q.buf) == 0 {
panic("the queue is null")
}
result := q.buf[q.head]
q.head++
if q.head >= q.endure {
q.relocate()
}
return result
}
func (q *Queue[T]) Head() T {
if len(q.buf) == q.head || len(q.buf) == 0 {
panic("the queue is null")
}
return q.buf[q.head]
}
// 0 1 2 3
// 1 2 3
func (q *Queue[T]) relocate() {
q.endure = len(q.buf) / 2
newData := make([]T, len(q.buf)-q.head)
copy(newData, q.buf[q.head:])
q.head = 0
q.buf = newData
}
func New[T any]() *Queue[T] {
return &Queue[T]{
make([]T, 0, InitialCapacity),
InitialEndure,
0,
}
}

88
utils/containers/queue/queue_test.go

@ -0,0 +1,88 @@
package queue
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestQueuePushAndSize(t *testing.T) {
q := New[int]()
q.Push(1, 2, 3)
assert.Equal(
t,
3,
q.Size(),
"Queue size should be 3 after pushing 3 elements",
)
}
func TestQueuePop(t *testing.T) {
q := New[int]()
q.Push(1, 2, 3)
pop1 := q.Pop()
assert.Equal(t, 1, pop1, "Expected to pop 1 as the first element")
assert.Equal(t, 2, q.Size(), "Queue size should be 2 after one pop")
pop2 := q.Pop()
assert.Equal(t, 2, pop2, "Expected to pop 2 as the second element")
}
func TestQueueHead(t *testing.T) {
q := New[int]()
q.Push(1, 2, 3)
head := q.Head()
assert.Equal(t, 1, head, "Expected head to be 1")
}
func TestQueuePopEmpty(t *testing.T) {
q := New[int]()
assert.Panics(
t,
func() { q.Pop() },
"Expected Pop to panic on empty queue",
)
}
func TestQueueHeadEmpty(t *testing.T) {
q := New[int]()
assert.Panics(
t,
func() { q.Head() },
"Expected Head to panic on empty queue",
)
}
// 性能测试
func BenchmarkQueuePush(b *testing.B) {
q := New[int]()
for i := 0; i < b.N; i++ {
q.Push(i)
}
}
func BenchmarkQueuePop(b *testing.B) {
q := New[int]()
for i := 0; i < b.N; i++ {
q.Push(i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Pop()
}
}
func BenchmarkQueue_All(b *testing.B) {
q := New[int]()
for i := 0; i < b.N; i++ {
q.Push(i)
}
for i := 0; i < b.N/3*2; i++ {
q.Pop()
}
for i := 0; i < b.N/2; i++ {
q.Push(i)
}
for i := 0; i < b.N/2; i++ {
q.Pop()
}
}

75
utils/containers/queue/sync_queue.go

@ -0,0 +1,75 @@
package queue
import (
"sync"
)
type SyncQueue[T any] struct {
lock *sync.Mutex
cond *sync.Cond
exit bool
queue *Queue[T]
}
func NewSync[T any]() *SyncQueue[T] {
queue := New[T]()
lock := &sync.Mutex{}
return &SyncQueue[T]{
lock,
sync.NewCond(lock),
false,
queue,
}
}
func (s *SyncQueue[T]) Size() int {
return s.queue.Size()
}
func (s *SyncQueue[T]) Push(elems ...T) {
s.lock.Lock()
defer s.lock.Unlock()
s.queue.Push(elems...)
s.cond.Signal()
}
func (s *SyncQueue[T]) Exit() {
s.lock.Lock()
defer s.lock.Unlock()
s.cond.Broadcast()
}
func (s *SyncQueue[T]) SyncPop() (T, bool) {
s.lock.Lock()
defer s.lock.Unlock()
for s.Size() == 0 {
s.cond.Wait()
if s.exit {
var t T
return t, true
}
}
return s.queue.Pop(), false
}
func (s *SyncQueue[T]) Pop() T {
s.lock.Lock()
defer s.lock.Unlock()
return s.queue.Pop()
}
func (s *SyncQueue[T]) SyncHead() (T, bool) {
s.lock.Lock()
defer s.lock.Unlock()
for s.Size() == 0 {
s.cond.Wait()
if s.exit {
var t T
return t, true
}
}
return s.queue.Head(), false
}
func (s *SyncQueue[T]) Head() T {
s.lock.Lock()
defer s.lock.Unlock()
return s.queue.Head()
}

87
utils/containers/queue/sync_queue_test.go

@ -0,0 +1,87 @@
package queue
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestSyncQueue(t *testing.T) {
q := NewSync[int]()
assert.Equal(t, 0, q.Size())
q.Push(1, 2, 3, 4)
assert.Equal(t, 4, q.Size())
head := q.Head()
assert.Equal(t, 1, head)
popped := q.Pop()
assert.Equal(t, 1, popped)
assert.Equal(t, 3, q.Size())
}
func TestSyncQueueConcurrent(t *testing.T) {
q := NewSync[int]()
wg := sync.WaitGroup{}
wg.Add(2)
// Producer
go func() {
defer wg.Done()
q.Push(1)
time.Sleep(100 * time.Millisecond)
q.Push(2)
}()
// Consumer
go func() {
defer wg.Done()
first, _ := q.SyncPop()
assert.Equal(t, 1, first)
second, _ := q.SyncPop()
assert.Equal(t, 2, second)
}()
wg.Wait()
}
func BenchmarkSyncQueuePushPop(b *testing.B) {
q := NewSync[int]()
wg := sync.WaitGroup{}
wg.Add(2)
b.RunParallel(
func(pb *testing.PB) {
for pb.Next() {
wg.Add(1)
go func() {
defer wg.Done()
q.Push(1)
_ = q.Pop()
}()
}
},
)
wg.Wait()
}
func BenchmarkSyncQueueSyncPop(b *testing.B) {
q := NewSync[int]()
for i := 0; i < b.N; i++ {
q.Push(i)
}
b.ResetTimer()
b.RunParallel(
func(pb *testing.PB) {
for pb.Next() {
_, _ = q.SyncPop()
}
},
)
}

44
utils/containers/stack/stack.go

@ -0,0 +1,44 @@
package stack
const (
InitialStackCapacity = 10
)
type Stack[T any] struct {
data []T
size int
}
func New[T any]() *Stack[T] {
return &Stack[T]{
data: make([]T, 0, InitialStackCapacity),
size: 0,
}
}
func (s *Stack[T]) Push(elems ...T) *Stack[T] {
s.data = append(s.data, elems...)
s.size += len(elems)
return s
}
func (s *Stack[T]) Pop() T {
if s.size == 0 {
panic("the stack is null")
}
s.size--
result := s.data[s.size]
s.data = s.data[:s.size]
return result
}
func (s *Stack[T]) Top() T {
if s.size == 0 {
panic("the stack is null")
}
return s.data[s.size-1]
}
func (s *Stack[T]) Size() int {
return s.size
}

78
utils/containers/stack/stack_test.go

@ -0,0 +1,78 @@
package stack
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestStack_PushPop(t *testing.T) {
s := New[int]()
s.Push(1)
assert.Equal(
t,
1,
s.Size(),
"The size should be 1 after pushing an element",
)
popped := s.Pop()
assert.Equal(
t,
1,
popped,
"The popped element should be equal to the last pushed element",
)
assert.Equal(
t,
0,
s.Size(),
"The size should be 0 after popping the last element",
)
}
func TestStack_Top(t *testing.T) {
s := New[string]()
s.Push("hello")
top := s.Top()
assert.Equal(t, "hello", top, "The top element should be 'hello'")
}
func TestStack_PopEmpty(t *testing.T) {
s := New[float64]()
s.Push(0.1, 0.2)
s.Pop()
s.Pop()
assert.Panics(
t,
func() { s.Pop() },
"Popping from an empty stack should panic",
)
}
func TestStack_TopEmpty(t *testing.T) {
s := New[bool]()
assert.Panics(
t,
func() { s.Top() },
"Calling Top on an empty stack should panic",
)
}
func BenchmarkStack_PushPop(b *testing.B) {
s := New[int]()
for i := 0; i < b.N; i++ {
s.Push(i)
}
for i := 0; i < b.N; i++ {
_ = s.Pop()
}
}
func BenchmarkStack_Top(b *testing.B) {
s := New[int]()
s.Push(1)
for i := 0; i < b.N; i++ {
_ = s.Top()
}
}
Loading…
Cancel
Save