371 lines
9.2 KiB
Go
Executable File
371 lines
9.2 KiB
Go
Executable File
// Package main is the main package.
|
|
package main
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"log"
|
|
"math/rand"
|
|
"net/http"
|
|
_ "net/http/pprof"
|
|
"os"
|
|
"runtime"
|
|
"runtime/pprof"
|
|
"time"
|
|
|
|
"github.com/eiannone/keyboard"
|
|
metrics "github.com/tevjef/go-runtime-metrics"
|
|
qpcl "gitlab-app.eng.qops.net/golang/qmp"
|
|
qsl "gitlab-app.eng.qops.net/golang/qmp/qsl"
|
|
)
|
|
|
|
// Log logs stuff.
|
|
func Log(a ...interface{}) {
|
|
s := fmt.Sprintf("%v", a)
|
|
fmt.Fprintln(os.Stderr, s[1:len(s)-1])
|
|
}
|
|
|
|
func main() {
|
|
if _, ok := os.LookupEnv("SCHEMA_REGISTRY_URL"); !ok {
|
|
log.Println("setting schema_registry_url")
|
|
os.Setenv("SCHEMA_REGISTRY_URL", "localhost:8081")
|
|
os.Setenv("SCHEMA_REGISTRY_URL", "qmp-schema-registry.service.b1-prv.consul:8081")
|
|
}
|
|
if _, ok := os.LookupEnv("QPL_BULK_BOOTSTRAP_SERVERS"); !ok {
|
|
log.Println("setting qpl_bulk_bootstrap_servers")
|
|
os.Setenv("QPL_BULK_BOOTSTRAP_SERVERS", "localhost:9092")
|
|
os.Setenv("QPL_BULK_BOOTSTRAP_SERVERS", "core-kafka.service.b1-prv.consul:9092")
|
|
}
|
|
os.Setenv("QPL_DURABLE_BOOTSTRAP_SERVERS", os.Getenv("QPL_BULK_BOOTSTRAP_SERVERS"))
|
|
os.Setenv("QCL_BULK_BOOTSTRAP_SERVERS", os.Getenv("QPL_BULK_BOOTSTRAP_SERVERS"))
|
|
os.Setenv("QCL_RECORD_STATE_BOOTSTRAP_SERVERS", os.Getenv("QPL_BULK_BOOTSTRAP_SERVERS"))
|
|
os.Setenv("QCL_BACKGROUND_JOB_BOOTSTRAP_SERVERS", os.Getenv("QPL_BULK_BOOTSTRAP_SERVERS"))
|
|
os.Setenv("QCL_IDEMPOTENT_BACKGROUND_JOB_BOOTSTRAP_SERVERS", os.Getenv("QPL_BULK_BOOTSTRAP_SERVERS"))
|
|
fmt.Println("\n---")
|
|
fmt.Println("HELLO AND WELCOME. Ctrl-C won't save you now.")
|
|
fmt.Println("* p create another producer")
|
|
fmt.Println("* o delete a producer")
|
|
fmt.Println("* c create another consumer")
|
|
fmt.Println("* x delete a consumer")
|
|
fmt.Println("* u queue a configuration update")
|
|
fmt.Println("* q quit")
|
|
fmt.Println("---\n")
|
|
fmt.Println("--consumers n // start with n consumers")
|
|
fmt.Println("--producers n // start with n producers")
|
|
fmt.Println("--lag n // wait n seconds between messages produced")
|
|
fmt.Println("--group x // consume as group x")
|
|
fmt.Println("--topic x // producer/consume from topic x")
|
|
fmt.Println("\n---\n")
|
|
|
|
// attempt influxDB/grafana but don't care if it fails
|
|
metrics.DefaultConfig.CollectionInterval = time.Millisecond * 100
|
|
metrics.DefaultConfig.BatchInterval = time.Second * 1
|
|
if err := metrics.RunCollector(metrics.DefaultConfig); err != nil {
|
|
Log(err)
|
|
}
|
|
|
|
rand.Seed(time.Now().UTC().UnixNano())
|
|
|
|
// configuration
|
|
conCnt := 1
|
|
proCnt := 1
|
|
lag := 1
|
|
group := "gogroup"
|
|
topic := "my-topic-1"
|
|
mylogger := false
|
|
|
|
flag.IntVar(&conCnt, "consumers", conCnt, "how many consumers to start")
|
|
flag.IntVar(&proCnt, "producers", proCnt, "how many producers to start")
|
|
flag.IntVar(&lag, "lag", lag, "seconds between prdoucer put()s")
|
|
flag.StringVar(&group, "group", group, "group name")
|
|
flag.StringVar(&topic, "topic", topic, "topic name")
|
|
flag.BoolVar(&mylogger, "pychart", mylogger, "print stats to stdout")
|
|
cpuprofile := flag.String("cpuprofile", "", "write cpu profile to file")
|
|
memprofile := flag.String("memprofile", "", "write memory profile to file")
|
|
flag.Parse()
|
|
|
|
// if profiling
|
|
if *cpuprofile != "" {
|
|
f, err := os.Create(*cpuprofile)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
go func() {
|
|
Log(http.ListenAndServe("localhost:6060", nil))
|
|
}()
|
|
pprof.StartCPUProfile(f)
|
|
defer pprof.StopCPUProfile()
|
|
}
|
|
|
|
// channels
|
|
conStop := make(chan rune)
|
|
proStop := make(chan rune)
|
|
confirmation := make(chan error, 10)
|
|
consumed := make(chan int, 1000)
|
|
produced := make(chan int, 1000)
|
|
doUpdate := make(chan bool, 1)
|
|
|
|
// Register App
|
|
for err := qsl.RegisterApplication("qsl-go"); err != nil; err = qsl.RegisterApplication("qsl-go") {
|
|
Log("QSL Register failed with:", err)
|
|
time.Sleep(time.Second)
|
|
Log("Trying QSL.RegisterApplication() again")
|
|
}
|
|
for err := qpcl.RegisterApplication("qsl-go"); err != nil; err = qpcl.RegisterApplication("qsl-go") {
|
|
Log("QPCL Register failed with:", err)
|
|
time.Sleep(time.Second)
|
|
Log("Trying QPCL.RegisterApplication() again")
|
|
}
|
|
|
|
globalStart := time.Now()
|
|
|
|
// Launch
|
|
for i := 0; i < proCnt; i++ {
|
|
go oneProducer(confirmation, proStop, produced, topic, lag)
|
|
}
|
|
for i := 0; i < conCnt; i++ {
|
|
go oneConsumer(confirmation, conStop, consumed, topic, group)
|
|
}
|
|
|
|
go updateProducer(doUpdate)
|
|
|
|
// Listen and report
|
|
cntP := uint64(0)
|
|
cntC := uint64(0)
|
|
keys := keyChannel()
|
|
/*
|
|
go func() {
|
|
rotation := []rune{'p', 'o', 'c', 'x'}
|
|
cur := 0
|
|
for {
|
|
time.Sleep(time.Duration(lag) * time.Second)
|
|
cur = (cur + 1) % len(rotation)
|
|
keys <- rotation[cur]
|
|
}
|
|
}()
|
|
*/
|
|
go func() {
|
|
for {
|
|
select {
|
|
case c := <-produced:
|
|
cntP += uint64(c)
|
|
case c := <-consumed:
|
|
cntC += uint64(c)
|
|
}
|
|
}
|
|
}()
|
|
for {
|
|
select {
|
|
case b := <-keys:
|
|
globalStop := time.Now()
|
|
if keyHandler(b, topic, group, lag, &proCnt, proStop, produced, &conCnt, conStop, consumed, confirmation, doUpdate) {
|
|
seconds := globalStop.Sub(globalStart).Seconds()
|
|
fmt.Fprintf(os.Stderr, "Produced %d in %.1f seconds, that's %.1f/second\n", cntP, seconds, float64(cntP)/seconds)
|
|
if *memprofile != "" {
|
|
f, err := os.Create(*memprofile)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
runtime.GC()
|
|
if err := pprof.WriteHeapProfile(f); err != nil {
|
|
panic(err)
|
|
}
|
|
f.Close()
|
|
}
|
|
return
|
|
}
|
|
case <-time.After(time.Second):
|
|
}
|
|
report(proCnt, conCnt, cntP, cntC)
|
|
}
|
|
}
|
|
|
|
func updateProducer(doUpdate chan bool) {
|
|
qw, err := qsl.NewWriter("qmp.configuration.v1")
|
|
if err != nil {
|
|
Log("Cannot queue an update:", err)
|
|
return
|
|
}
|
|
qp, err := qpcl.NewProducer(qpcl.PBulk, "qmp.state.config")
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
for {
|
|
<-doUpdate
|
|
|
|
t := time.Now().Second() % 2
|
|
key := "bulk"
|
|
typeS := "consumer"
|
|
conf := map[string]interface{}{
|
|
"consumer": map[string]interface{}{
|
|
"enable_auto_commit": false,
|
|
"max_poll_records": map[string]interface{}{"int": 1},
|
|
},
|
|
"qmp": map[string]interface{}{},
|
|
}
|
|
if t == 0 {
|
|
key = "bulk"
|
|
conf = map[string]interface{}{
|
|
"producer": map[string]interface{}{},
|
|
}
|
|
typeS = "producer"
|
|
}
|
|
|
|
msg, err := qw.NewMessage()
|
|
if err != nil {
|
|
fmt.Printf("\n%v\n", err)
|
|
continue
|
|
}
|
|
msg.Set(key+"_"+typeS, qsl.Union("qmp.envelope."+typeS, conf))
|
|
|
|
err = qp.Put(msg, nil)
|
|
if err != nil {
|
|
fmt.Printf("\n%v\n", err)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
func keyHandler(key rune, topic, group string, lag int, proCnt *int, proStop chan rune, proCounter chan int, conCnt *int, conStop chan rune, conCounter chan int, confirm chan error, doUpdate chan bool) bool {
|
|
switch key {
|
|
case 'q':
|
|
fmt.Fprintf(os.Stderr, "\n")
|
|
for i := 0; i < *proCnt; i++ {
|
|
fmt.Fprintf(os.Stderr, "\tpro %d/%d\n", i+1, *proCnt)
|
|
proStop <- 'q'
|
|
}
|
|
for i := 0; i < *conCnt; i++ {
|
|
fmt.Fprintf(os.Stderr, "\tcon %d/%d\n", i+1, *conCnt)
|
|
conStop <- 'q'
|
|
}
|
|
for i := 0; i < *proCnt+*conCnt; i++ {
|
|
<-confirm
|
|
}
|
|
return true
|
|
case 'u':
|
|
doUpdate <- true
|
|
case 'c':
|
|
go oneConsumer(confirm, conStop, conCounter, topic, group)
|
|
*conCnt++
|
|
case 'x':
|
|
if *conCnt > 0 {
|
|
conStop <- 'x'
|
|
*conCnt--
|
|
<-confirm
|
|
}
|
|
case 'p':
|
|
go oneProducer(confirm, proStop, proCounter, topic, lag)
|
|
*proCnt++
|
|
case 'o':
|
|
if *proCnt > 0 {
|
|
proStop <- 'o'
|
|
*proCnt--
|
|
<-confirm
|
|
}
|
|
case 'g':
|
|
runtime.GC()
|
|
}
|
|
return false
|
|
}
|
|
|
|
func report(producers, consumers int, produced, consumed uint64) {
|
|
plen := len(fmt.Sprintf("%d", produced))
|
|
plen += plen % 3
|
|
clen := len(fmt.Sprintf("%d", consumed))
|
|
clen += clen % 3
|
|
template := fmt.Sprintf("\rProduced (%%3d): %%%dd \t\tConsumed (%%3d): %%%dd ", plen, clen)
|
|
fmt.Fprintf(os.Stdout, template, producers, produced, consumers, consumed)
|
|
}
|
|
|
|
func oneConsumer(confirm chan<- error, stop <-chan rune, count chan int, topic, group string) {
|
|
//qr, err := qsl.NewReader("qmp.payload.v1")
|
|
//qr, err := qsl.NewReader("isolation-service.key-rotation.v1")
|
|
qr, err := qsl.NewReader("qds.record.v1")
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
qc, err := qpcl.NewConsumer(qpcl.CBulk, topic, group, qr, 1)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
err = qc.Start(func(qm qsl.Message) {
|
|
fmt.Printf("\n%v\n", qm.GetAvroObject()["payload"])
|
|
select {
|
|
case prev := <-count:
|
|
count <- prev + 1
|
|
case count <- 1:
|
|
}
|
|
})
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
<-stop
|
|
confirm <- qc.Stop()
|
|
return
|
|
}
|
|
|
|
func oneProducer(confirm chan<- error, stop <-chan rune, count chan int, topic string, lag int) {
|
|
qw, err := qsl.NewWriter("qmp.payload.v1")
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
qp, err := qpcl.NewProducer(qpcl.PBulk, topic)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
for {
|
|
msg, err := qw.NewMessage()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
if err := msg.Set("brandId", "Go"); err != nil {
|
|
panic(err)
|
|
}
|
|
err = qp.Put(msg, func(e error, q qsl.Message) {
|
|
if e != nil {
|
|
fmt.Println("Cannot produce:", e)
|
|
} else {
|
|
select {
|
|
case prev := <-count:
|
|
count <- prev + 1
|
|
case count <- 1:
|
|
}
|
|
}
|
|
})
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
select {
|
|
case <-stop:
|
|
err := qp.Close()
|
|
confirm <- err
|
|
return
|
|
case <-time.After(time.Duration(lag) * time.Second):
|
|
}
|
|
}
|
|
}
|
|
|
|
func keyChannel() chan rune {
|
|
ch := make(chan rune, 20)
|
|
go func() {
|
|
time.Sleep(time.Second * 5)
|
|
if err := keyboard.Open(); err != nil {
|
|
panic(err)
|
|
}
|
|
for {
|
|
b, _, err := keyboard.GetKey()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
by := rune(b)
|
|
if by == 'q' {
|
|
keyboard.Close()
|
|
ch <- by
|
|
return
|
|
}
|
|
ch <- by
|
|
}
|
|
}()
|
|
return ch
|
|
}
|