diff --git a/src/cmd/cron.go b/src/cmd/cron.go new file mode 100644 index 0000000..f5ce4cc --- /dev/null +++ b/src/cmd/cron.go @@ -0,0 +1,10 @@ +package cmd + +import ( + "context" + "io" +) + +func cron(ctx context.Context) error { + return io.EOF +} diff --git a/src/cmd/main.go b/src/cmd/main.go index c937ece..6b3ace7 100644 --- a/src/cmd/main.go +++ b/src/cmd/main.go @@ -2,9 +2,27 @@ package cmd import ( "context" - "io" + "fmt" + "show-rss/src/pool" ) func Main(ctx context.Context) error { - return io.EOF + ctx, can := context.WithCancel(ctx) + defer can() + + foos := map[string]func() error{ + "server": func() error { return server(ctx) }, + "cron": func() error { return cron(ctx) }, + } + p := pool.New(len(foos)) + defer p.Wait(ctx) + + for k, foo := range foos { + foo := foo + if err := p.Go(ctx, k, foo); err != nil { + return fmt.Errorf("failed to go %s: %v", k, err) + } + } + + return p.Wait(ctx) } diff --git a/src/cmd/server.go b/src/cmd/server.go new file mode 100644 index 0000000..4525918 --- /dev/null +++ b/src/cmd/server.go @@ -0,0 +1,10 @@ +package cmd + +import ( + "context" + "io" +) + +func server(ctx context.Context) error { + return io.EOF +} diff --git a/src/pool/pool.go b/src/pool/pool.go index 5faee9f..9b8409e 100644 --- a/src/pool/pool.go +++ b/src/pool/pool.go @@ -39,36 +39,14 @@ func (p *Pool) Go(ctx context.Context, name string, foo func() error) error { } func (p *Pool) Wait(ctx context.Context) error { - waited := make(chan bool) - defer close(waited) - go func() { - c := time.NewTicker(100 * time.Millisecond) - defer c.Stop() - - if p.jobs != nil { - for len(p.jobs) > 0 && ctx.Err() == nil { - select { - case <-ctx.Done(): - case <-c.C: - } - } - close(p.jobs) - } - - p.wg.Wait() - select { - case <-ctx.Done(): - case waited <- true: - } - }() - - select { - case <-ctx.Done(): - return ctx.Err() - case <-waited: - p.jobs = nil + if err := p.close(ctx); err != nil { + return err } + return p.Err() +} + +func (p *Pool) Err() error { if len(p.errs) == 0 { return nil } @@ -142,3 +120,47 @@ func (p *Pool) withLock(foo func()) { defer p.lock.Unlock() foo() } + +func (p *Pool) close(ctx context.Context) error { + waited := make(chan bool) + go func() { + defer close(waited) + + p.withLock(func() { + p._close(ctx) + }) + + select { + case <-ctx.Done(): + case waited <- true: + } + }() + + select { + case <-ctx.Done(): + case <-waited: + } + + return ctx.Err() +} + +func (p *Pool) _close(ctx context.Context) { + if p.jobs != nil { + c := time.NewTicker(100 * time.Millisecond) + defer c.Stop() + + for len(p.jobs) > 0 && ctx.Err() == nil { + select { + case <-ctx.Done(): + case <-c.C: + } + } + func() { + defer func() { recover() }() + close(p.jobs) + }() + } + p.jobs = nil + + p.wg.Wait() +}