// Package qc supplies a Kafka consumer client compliant // with the QPCL specification. package qc import ( "fmt" "strconv" "strings" "sync" "time" "gitlab-app.eng.qops.net/golang/metrics" "github.com/Shopify/sarama" "github.com/bsm/sarama-cluster" "gitlab-app.eng.qops.net/golang/qmp/internal/config" "gitlab-app.eng.qops.net/golang/qmp/internal/types" "gitlab-app.eng.qops.net/golang/qmp/qsl" "gitlab-app.eng.qops.net/golang/qmp/qsl/logger" ) // Consumer implements the qpcl.Consumer interface. type Consumer struct { static consumerStatic listenConf listener listenerCount int closed bool reporter metrics.Reporter reader qsl.Reader startStop []sync.Once updateLock *sync.Mutex } type listener struct { config *cluster.Config maxLag time.Duration } type consumerStatic struct { batchSize int clientID string threads int brokers []string group string topic []string qpcType types.QPCType stop chan bool exit chan error updates chan clusterConsumer updateConfirmation chan error } // clusterConsumer implemented by the cluster.Consumer struct. type clusterConsumer interface { Messages() <-chan *sarama.ConsumerMessage Notifications() <-chan *cluster.Notification Errors() <-chan error MarkOffset(*sarama.ConsumerMessage, string) CommitOffsets() error Close() error } const defaultMaxLag = time.Duration(5 * time.Second) // NewConsumer creates a Consumer ready to receive messages from the // topic within the specified group after a .Start() call. func NewConsumer(topic, group string, reader qsl.Reader, configuration map[string]string, reporter metrics.Reporter, qpcType types.QPCType) (*Consumer, error) { return newConsumer([]string{topic}, group, reader, configuration, reporter, qpcType) } // NewStateTopicConsumer creates a Record State Consumer that receives messages // about topic class configuration and throttling updates after a .Start() // call. func NewStateTopicConsumer(group string, configuration map[string]string, reporter metrics.Reporter) (*Consumer, error) { reader, err := qsl.NewReader(config.SchemaIDConfiguration) if err != nil { return nil, err } return newConsumer([]string{config.TopicStateConfig, config.TopicStateThrottle}, group, reader, configuration, reporter, types.CRecordState) } func newConsumer(topic []string, group string, reader qsl.Reader, configuration map[string]string, reporter metrics.Reporter, qpcType types.QPCType) (*Consumer, error) { listenConf, static, err := parseConfiguration(configuration, topic, group) if err != nil { return nil, err } static.qpcType = qpcType static.stop = make(chan bool, static.threads) static.exit = make(chan error, static.threads) static.updates = make(chan clusterConsumer) static.updateConfirmation = make(chan error, static.threads) return &Consumer{ static: *static, listenConf: *listenConf, listenerCount: 0, closed: false, reporter: reporter, reader: reader, startStop: []sync.Once{sync.Once{}, sync.Once{}}, updateLock: &sync.Mutex{}, }, nil } // Start will launch goroutines for each of c.threads // consumers of the Consumer's topic and group. func (qc *Consumer) Start(handler func(qsl.Message)) error { outerErr := ErrStartOnce qc.startStop[0].Do(func() { qc.updateLock.Lock() defer qc.updateLock.Unlock() if qc.IsStopped() { return } outerErr = nil for i := 0; i < qc.static.threads; i++ { newConsumer, err := qc.makeConsumer(qc.static.clientID, i) if err != nil { outerErr = err return } go qc.listen(handler) select { case qc.static.updates <- newConsumer: case <-time.After(time.Second * 5): outerErr = ErrListenerDidntNew newConsumer.Close() return } } qc.listenerCount = qc.static.threads }) return outerErr } func (qc *Consumer) listen(handler func(qsl.Message)) { updates := qc.static.updates confirmation := qc.static.updateConfirmation stop := qc.static.stop exit := qc.static.exit kafkaConsumer := <-updates offsetsUncommitted := 0 for { start := time.Now() select { case msg := <-kafkaConsumer.Messages(): qc.reportPollTime(start) kafkaConsumer.MarkOffset(msg, "") qm, err := qc.reader.NewMessage(msg.Value) if err != nil { qc.reportDeserializationError(err, msg.Partition, msg.Offset) continue } if put, err := qm.GetTimestamp(); err == nil { putTime := time.Unix(0, put*int64(time.Millisecond/time.Nanosecond)) lapse := time.Now().Sub(putTime) if lapse > qc.listenConf.maxLag { logger.Warn(lapse.Seconds(), "seconds between Kafka event and consumption") } } if handler != nil { handler(qm) } offsetsUncommitted++ if offsetsUncommitted >= qc.static.batchSize { offsetsUncommitted = 0 kafkaConsumer.CommitOffsets() } qc.reportMessageHandled(int64(len(msg.Value))) case newKafkaConsumer := <-updates: <-confirmation kafkaConsumer.CommitOffsets() kafkaConsumer.Close() kafkaConsumer = newKafkaConsumer case <-stop: kafkaConsumer.CommitOffsets() exit <- kafkaConsumer.Close() return } } } func (qc *Consumer) reportPollTime(start time.Time) { qc.reporter.RecordTiming("execution.poll.time", time.Since(start), metrics.Tag("measurement", "function"), metrics.Tag("topic", qc.GetTopic()), metrics.Tag("consumerGroup", qc.static.group), metrics.Tag("consumerID", qc.static.clientID), ) } func (qc *Consumer) reportDeserializationError(err error, partition int32, offset int64) { partitionStr := strconv.FormatInt(int64(partition), 10) offsetStr := strconv.FormatInt(offset, 10) qc.reporter.IncCounter("execution.deserialization.errors", metrics.Tag("topic", qc.GetTopic()), metrics.Tag("partition", partitionStr), metrics.Tag("consumerGroup", strings.ToLower(qc.static.group)), metrics.Tag("consumerID", qc.static.clientID), ) logger.Warn(fmt.Sprintf("deserialization error: %v, %v, %v, %v, %v", metrics.Tag("topic", qc.GetTopic()), metrics.Tag("partition", partitionStr), metrics.Tag("offset", offsetStr), metrics.Tag("consumerGroup", strings.ToLower(qc.static.group)), metrics.Tag("consumerID", qc.static.clientID), )) } func (qc *Consumer) reportMessageHandled(messageBytes int64) { qc.reporter.AddToCounter( "execution.poll.bytes", messageBytes, metrics.Tag("measurement", "function"), metrics.Tag("topic", qc.GetTopic()), metrics.Tag("consumerGroup", qc.static.group), metrics.Tag("consumerID", qc.static.clientID), ) qc.reporter.IncCounter( "execution.poll.records", metrics.Tag("measurement", "function"), metrics.Tag("topic", qc.GetTopic()), metrics.Tag("consumerGroup", qc.static.group), metrics.Tag("consumerID", qc.static.clientID), ) } // Update will take the consumer and return a consumer // with an updated configuration for each active // listener. func (qc *Consumer) Update(configuration map[string]string) error { configuration[config.ClientID] = qc.listenConf.config.ClientID listenConf, static, err := parseConfiguration(configuration, qc.static.topic, qc.static.group) if err != nil { return err } qc.updateLock.Lock() defer qc.updateLock.Unlock() if qc.IsStopped() { return ErrUpdateClosed } // for each listener, build a new consumer and send it with timeout for i := 0; i < qc.listenerCount; i++ { var newConsumer clusterConsumer newConsumer, err = qc.makeConsumer(qc.static.clientID, i) if err != nil { return err } select { case qc.static.updates <- newConsumer: case <-time.After(time.Second * 5): err = ErrListenerDidntUpdate newConsumer.Close() qc.listenerCount-- } } if err == nil { qc.static.brokers = static.brokers qc.listenConf = *listenConf } // for each listener, release to listen again for i := 0; i < qc.listenerCount; i++ { qc.static.updateConfirmation <- err } return err } func (qc *Consumer) makeConsumer(clientID string, index int) (clusterConsumer, error) { conf := *qc.listenConf.config listenConf := listener{ maxLag: qc.listenConf.maxLag, config: &conf, } listenConf.config.ClientID = fmt.Sprintf("%s-thread%d", clientID, index) return cluster.NewConsumer(qc.static.brokers, qc.static.group, qc.static.topic, listenConf.config) } // Stop will tell its active goroutine consumers // to stop and clean up. func (qc *Consumer) Stop() error { outerErr := ErrStopOnce qc.startStop[1].Do(func() { if qc.IsStopped() { return } outerErr = nil qc.updateLock.Lock() defer qc.updateLock.Unlock() for i := 0; i < qc.listenerCount; i++ { qc.static.stop <- true } for i := 0; i < qc.listenerCount; i++ { select { case err := <-qc.static.exit: if err != nil { outerErr = err } case <-time.After(time.Second * 4): outerErr = ErrListenerDidntExit } } qc.listenerCount = 0 qc.closed = true }) return outerErr } // Join asserts the Consumer has been closed. func (qc *Consumer) Join() error { qc.updateLock.Lock() defer qc.updateLock.Unlock() if !qc.IsStopped() { return ErrJoinWithoutStop } return nil } // IsStopped returns whether the consumer has // been closed. func (qc *Consumer) IsStopped() bool { return qc.closed } // GetTopic returns the topic of the Consumer. func (qc *Consumer) GetTopic() string { return strings.Join(qc.static.topic, ",") } // GetType returns the QPCType of the // Consumer. func (qc *Consumer) GetType() types.QPCType { return qc.static.qpcType } func parseConfiguration(configuration map[string]string, topic []string, group string) (*listener, *consumerStatic, error) { params, err := config.GetMandatoryParams(configuration, topic, group) if err != nil { return nil, nil, err } // At least 1 thread if specified threads := 1 if confThreads, ok := configuration[config.QMPThreads]; ok { var err error threads, err = strconv.Atoi(confThreads) if err != nil { return nil, nil, err } if threads < 1 { return nil, nil, ErrNaturalThreads } } // Maximum lag without logging must be valid duration maxLag := defaultMaxLag if confLag, ok := configuration[config.MaxLagMS]; ok { var err error maxLag, err = time.ParseDuration(confLag) if err != nil { s, err := strconv.ParseInt(confLag, 10, 64) if err != nil { return nil, nil, err } maxLag = time.Duration(s) * time.Millisecond } } // batch.size for committing offsets batchSizeS, ok := configuration[config.MaxPollRecords] if !ok { batchSizeS = "500" } batchSize, err := strconv.ParseInt(batchSizeS, 10, 16) if err != nil { return nil, nil, ErrNoBatchSize } // The rest of the owl config, err := config.ToClusterConfig(configuration) if err != nil { return nil, nil, err } return &listener{ config: config, maxLag: maxLag, }, &consumerStatic{ clientID: params.ClientID, brokers: params.Brokers, threads: threads, group: params.Group, topic: params.Topic, batchSize: int(batchSize), }, nil } // GetClientID returns the clientID of the consumer. func (qc *Consumer) GetClientID() string { return qc.static.clientID } // IsClosed fulfills the updatable interface in client.go. func (qc *Consumer) IsClosed() bool { return qc.IsStopped() }