// Package main is the main package. package main import ( "flag" "fmt" "log" "math/rand" "net/http" _ "net/http/pprof" "os" "runtime" "runtime/pprof" "time" "github.com/eiannone/keyboard" metrics "github.com/tevjef/go-runtime-metrics" qpcl "gitlab-app.eng.qops.net/golang/qmp" qsl "gitlab-app.eng.qops.net/golang/qmp/qsl" ) // Log logs stuff. func Log(a ...interface{}) { s := fmt.Sprintf("%v", a) fmt.Fprintln(os.Stderr, s[1:len(s)-1]) } func main() { if _, ok := os.LookupEnv("SCHEMA_REGISTRY_URL"); !ok { log.Println("setting schema_registry_url") os.Setenv("SCHEMA_REGISTRY_URL", "localhost:8081") os.Setenv("SCHEMA_REGISTRY_URL", "qmp-schema-registry.service.b1-prv.consul:8081") } if _, ok := os.LookupEnv("QPL_BULK_BOOTSTRAP_SERVERS"); !ok { log.Println("setting qpl_bulk_bootstrap_servers") os.Setenv("QPL_BULK_BOOTSTRAP_SERVERS", "localhost:9092") os.Setenv("QPL_BULK_BOOTSTRAP_SERVERS", "core-kafka.service.b1-prv.consul:9092") } os.Setenv("QPL_DURABLE_BOOTSTRAP_SERVERS", os.Getenv("QPL_BULK_BOOTSTRAP_SERVERS")) os.Setenv("QCL_BULK_BOOTSTRAP_SERVERS", os.Getenv("QPL_BULK_BOOTSTRAP_SERVERS")) os.Setenv("QCL_RECORD_STATE_BOOTSTRAP_SERVERS", os.Getenv("QPL_BULK_BOOTSTRAP_SERVERS")) os.Setenv("QCL_BACKGROUND_JOB_BOOTSTRAP_SERVERS", os.Getenv("QPL_BULK_BOOTSTRAP_SERVERS")) os.Setenv("QCL_IDEMPOTENT_BACKGROUND_JOB_BOOTSTRAP_SERVERS", os.Getenv("QPL_BULK_BOOTSTRAP_SERVERS")) fmt.Println("\n---") fmt.Println("HELLO AND WELCOME. Ctrl-C won't save you now.") fmt.Println("* p create another producer") fmt.Println("* o delete a producer") fmt.Println("* c create another consumer") fmt.Println("* x delete a consumer") fmt.Println("* u queue a configuration update") fmt.Println("* q quit") fmt.Println("---\n") fmt.Println("--consumers n // start with n consumers") fmt.Println("--producers n // start with n producers") fmt.Println("--lag n // wait n seconds between messages produced") fmt.Println("--group x // consume as group x") fmt.Println("--topic x // producer/consume from topic x") fmt.Println("\n---\n") // attempt influxDB/grafana but don't care if it fails metrics.DefaultConfig.CollectionInterval = time.Millisecond * 100 metrics.DefaultConfig.BatchInterval = time.Second * 1 if err := metrics.RunCollector(metrics.DefaultConfig); err != nil { Log(err) } rand.Seed(time.Now().UTC().UnixNano()) // configuration conCnt := 1 proCnt := 1 lag := 1 group := "gogroup" topic := "my-topic-1" mylogger := false flag.IntVar(&conCnt, "consumers", conCnt, "how many consumers to start") flag.IntVar(&proCnt, "producers", proCnt, "how many producers to start") flag.IntVar(&lag, "lag", lag, "seconds between prdoucer put()s") flag.StringVar(&group, "group", group, "group name") flag.StringVar(&topic, "topic", topic, "topic name") flag.BoolVar(&mylogger, "pychart", mylogger, "print stats to stdout") cpuprofile := flag.String("cpuprofile", "", "write cpu profile to file") memprofile := flag.String("memprofile", "", "write memory profile to file") flag.Parse() // if profiling if *cpuprofile != "" { f, err := os.Create(*cpuprofile) if err != nil { panic(err) } go func() { Log(http.ListenAndServe("localhost:6060", nil)) }() pprof.StartCPUProfile(f) defer pprof.StopCPUProfile() } // channels conStop := make(chan rune) proStop := make(chan rune) confirmation := make(chan error, 10) consumed := make(chan int, 1000) produced := make(chan int, 1000) doUpdate := make(chan bool, 1) // Register App for err := qsl.RegisterApplication("qsl-go"); err != nil; err = qsl.RegisterApplication("qsl-go") { Log("QSL Register failed with:", err) time.Sleep(time.Second) Log("Trying QSL.RegisterApplication() again") } for err := qpcl.RegisterApplication("qsl-go"); err != nil; err = qpcl.RegisterApplication("qsl-go") { Log("QPCL Register failed with:", err) time.Sleep(time.Second) Log("Trying QPCL.RegisterApplication() again") } globalStart := time.Now() // Launch for i := 0; i < proCnt; i++ { go oneProducer(confirmation, proStop, produced, topic, lag) } for i := 0; i < conCnt; i++ { go oneConsumer(confirmation, conStop, consumed, topic, group) } go updateProducer(doUpdate) // Listen and report cntP := uint64(0) cntC := uint64(0) keys := keyChannel() /* go func() { rotation := []rune{'p', 'o', 'c', 'x'} cur := 0 for { time.Sleep(time.Duration(lag) * time.Second) cur = (cur + 1) % len(rotation) keys <- rotation[cur] } }() */ go func() { for { select { case c := <-produced: cntP += uint64(c) case c := <-consumed: cntC += uint64(c) } } }() for { select { case b := <-keys: globalStop := time.Now() if keyHandler(b, topic, group, lag, &proCnt, proStop, produced, &conCnt, conStop, consumed, confirmation, doUpdate) { seconds := globalStop.Sub(globalStart).Seconds() fmt.Fprintf(os.Stderr, "Produced %d in %.1f seconds, that's %.1f/second\n", cntP, seconds, float64(cntP)/seconds) if *memprofile != "" { f, err := os.Create(*memprofile) if err != nil { panic(err) } runtime.GC() if err := pprof.WriteHeapProfile(f); err != nil { panic(err) } f.Close() } return } case <-time.After(time.Second): } report(proCnt, conCnt, cntP, cntC) } } func updateProducer(doUpdate chan bool) { qw, err := qsl.NewWriter("qmp.configuration.v1") if err != nil { Log("Cannot queue an update:", err) return } qp, err := qpcl.NewProducer(qpcl.PBulk, "qmp.state.config") if err != nil { panic(err) } for { <-doUpdate t := time.Now().Second() % 2 key := "bulk" typeS := "consumer" conf := map[string]interface{}{ "consumer": map[string]interface{}{ "enable_auto_commit": false, "max_poll_records": map[string]interface{}{"int": 1}, }, "qmp": map[string]interface{}{}, } if t == 0 { key = "bulk" conf = map[string]interface{}{ "producer": map[string]interface{}{}, } typeS = "producer" } msg, err := qw.NewMessage() if err != nil { fmt.Printf("\n%v\n", err) continue } msg.Set(key+"_"+typeS, qsl.Union("qmp.envelope."+typeS, conf)) err = qp.Put(msg, nil) if err != nil { fmt.Printf("\n%v\n", err) continue } } } func keyHandler(key rune, topic, group string, lag int, proCnt *int, proStop chan rune, proCounter chan int, conCnt *int, conStop chan rune, conCounter chan int, confirm chan error, doUpdate chan bool) bool { switch key { case 'q': fmt.Fprintf(os.Stderr, "\n") for i := 0; i < *proCnt; i++ { fmt.Fprintf(os.Stderr, "\tpro %d/%d\n", i+1, *proCnt) proStop <- 'q' } for i := 0; i < *conCnt; i++ { fmt.Fprintf(os.Stderr, "\tcon %d/%d\n", i+1, *conCnt) conStop <- 'q' } for i := 0; i < *proCnt+*conCnt; i++ { <-confirm } return true case 'u': doUpdate <- true case 'c': go oneConsumer(confirm, conStop, conCounter, topic, group) *conCnt++ case 'x': if *conCnt > 0 { conStop <- 'x' *conCnt-- <-confirm } case 'p': go oneProducer(confirm, proStop, proCounter, topic, lag) *proCnt++ case 'o': if *proCnt > 0 { proStop <- 'o' *proCnt-- <-confirm } case 'g': runtime.GC() } return false } func report(producers, consumers int, produced, consumed uint64) { plen := len(fmt.Sprintf("%d", produced)) plen += plen % 3 clen := len(fmt.Sprintf("%d", consumed)) clen += clen % 3 template := fmt.Sprintf("\rProduced (%%3d): %%%dd \t\tConsumed (%%3d): %%%dd ", plen, clen) fmt.Fprintf(os.Stdout, template, producers, produced, consumers, consumed) } func oneConsumer(confirm chan<- error, stop <-chan rune, count chan int, topic, group string) { //qr, err := qsl.NewReader("qmp.payload.v1") //qr, err := qsl.NewReader("isolation-service.key-rotation.v1") qr, err := qsl.NewReader("qds.record.v1") if err != nil { panic(err) } qc, err := qpcl.NewConsumer(qpcl.CBulk, topic, group, qr, 1) if err != nil { panic(err) } err = qc.Start(func(qm qsl.Message) { fmt.Printf("\n%v\n", qm.GetAvroObject()["payload"]) select { case prev := <-count: count <- prev + 1 case count <- 1: } }) if err != nil { panic(err) } <-stop confirm <- qc.Stop() return } func oneProducer(confirm chan<- error, stop <-chan rune, count chan int, topic string, lag int) { qw, err := qsl.NewWriter("qmp.payload.v1") if err != nil { panic(err) } qp, err := qpcl.NewProducer(qpcl.PBulk, topic) if err != nil { panic(err) } for { msg, err := qw.NewMessage() if err != nil { panic(err) } if err := msg.Set("brandId", "Go"); err != nil { panic(err) } err = qp.Put(msg, func(e error, q qsl.Message) { if e != nil { fmt.Println("Cannot produce:", e) } else { select { case prev := <-count: count <- prev + 1 case count <- 1: } } }) if err != nil { panic(err) } select { case <-stop: err := qp.Close() confirm <- err return case <-time.After(time.Duration(lag) * time.Second): } } } func keyChannel() chan rune { ch := make(chan rune, 20) go func() { time.Sleep(time.Second * 5) if err := keyboard.Open(); err != nil { panic(err) } for { b, _, err := keyboard.GetKey() if err != nil { panic(err) } by := rune(b) if by == 'q' { keyboard.Close() ch <- by return } ch <- by } }() return ch }