diff --git a/cmd/pg-pubsub-demo/main.go b/cmd/pg-pubsub-demo/main.go index 2ade756..241b9ab 100644 --- a/cmd/pg-pubsub-demo/main.go +++ b/cmd/pg-pubsub-demo/main.go @@ -221,23 +221,23 @@ func run(ctx context.Context) error { ) }) - go with.Every(ctx, 1, func() { - for i := 0; i < 2; i++ { - topic := fmt.Sprintf("topic_%d", i) + for i := 0; i < 2; i++ { + topic := fmt.Sprintf("topic_%d", i) + go with.Every(ctx, 1, func() { if err := pub(topic, 1); err != nil { log.Printf("failed pub: %v", err) } else { pubs.Add(1) } - } - }) + }) + } - with.Every(ctx, 1, func() { - if err := func() error { - for i := 0; i < 2; i++ { - topic := fmt.Sprintf("topic_%d", i) - for j := 0; j < 2; j++ { - group := fmt.Sprintf("group_%d", i) + for i := 0; i < 2; i++ { + topic := fmt.Sprintf("topic_%d", i) + for j := 0; j < 2; j++ { + group := fmt.Sprintf("group_%d", i) + go with.Every(ctx, 1, func() { + if err := func() error { if partition, offset, _, err := sub(topic, group); err != nil { return fmt.Errorf("failed sub: %w", err) } else if partition == -1 { @@ -247,14 +247,15 @@ func run(ctx context.Context) error { subs.Add(1) lastCommit = fmt.Sprintf("%s/%s/%d@%d", topic, group, partition, offset) } + return nil + }(); err != nil { + log.Printf("failed subs: %v", err) } - } - return nil - }(); err != nil { - log.Printf("failed subs: %v", err) + }) } - }) + } + <-ctx.Done() return ctx.Err() }) }