This commit is contained in:
Bel LaPointe
2025-04-24 21:00:55 -06:00
parent 5a51ebf884
commit 0e0ade420a
4 changed files with 90 additions and 30 deletions

View File

@@ -39,36 +39,14 @@ func (p *Pool) Go(ctx context.Context, name string, foo func() error) error {
}
func (p *Pool) Wait(ctx context.Context) error {
waited := make(chan bool)
defer close(waited)
go func() {
c := time.NewTicker(100 * time.Millisecond)
defer c.Stop()
if p.jobs != nil {
for len(p.jobs) > 0 && ctx.Err() == nil {
select {
case <-ctx.Done():
case <-c.C:
}
}
close(p.jobs)
}
p.wg.Wait()
select {
case <-ctx.Done():
case waited <- true:
}
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-waited:
p.jobs = nil
if err := p.close(ctx); err != nil {
return err
}
return p.Err()
}
func (p *Pool) Err() error {
if len(p.errs) == 0 {
return nil
}
@@ -142,3 +120,47 @@ func (p *Pool) withLock(foo func()) {
defer p.lock.Unlock()
foo()
}
func (p *Pool) close(ctx context.Context) error {
waited := make(chan bool)
go func() {
defer close(waited)
p.withLock(func() {
p._close(ctx)
})
select {
case <-ctx.Done():
case waited <- true:
}
}()
select {
case <-ctx.Done():
case <-waited:
}
return ctx.Err()
}
func (p *Pool) _close(ctx context.Context) {
if p.jobs != nil {
c := time.NewTicker(100 * time.Millisecond)
defer c.Stop()
for len(p.jobs) > 0 && ctx.Err() == nil {
select {
case <-ctx.Done():
case <-c.C:
}
}
func() {
defer func() { recover() }()
close(p.jobs)
}()
}
p.jobs = nil
p.wg.Wait()
}