qmp-testing-suite/golang-producer-consumer/vendor/github.com/tevjef/go-runtime-metrics/runstats.go

261 lines
5.4 KiB
Go
Executable File

package runstats
import (
"log"
"os"
"time"
"fmt"
"github.com/influxdata/influxdb/client/v2"
"github.com/pkg/errors"
"github.com/tevjef/go-runtime-metrics/collector"
)
const (
defaultHost = "localhost:8086"
defaultMeasurement = "go.runtime"
defaultDatabase = "stats"
defaultCollectionInterval = 10 * time.Second
defaultBatchInterval = 60 * time.Second
)
// A configuration with default values.
var DefaultConfig = &Config{}
type Config struct {
// InfluxDb host:port pair.
// Default is "localhost:8086".
Host string
// Database to write points to.
// Default is "stats" and is auto created
Database string
// Username with privileges on provided database.
Username string
// Password for provided user.
Password string
// Measurement to write points to.
// Default is "go.runtime.<hostname>".
Measurement string
// Measurement to write points to.
RetentionPolicy string
// Interval at which to write batched points to InfluxDB.
// Default is 60 seconds
BatchInterval time.Duration
// Precision in time to write your points in.
// Default is nanoseconds
Precision string
// Interval at which to collect points.
// Default is 10 seconds
CollectionInterval time.Duration
// Disable collecting CPU Statistics. cpu.*
// Default is false
DisableCpu bool
// Disable collecting Memory Statistics. mem.*
DisableMem bool
// Disable collecting GC Statistics (requires Memory be not be disabled). mem.gc.*
DisableGc bool
// Default is DefaultLogger which exits when the library encounters a fatal error.
Logger Logger
}
func (config *Config) init() (*Config, error) {
if config == nil {
config = DefaultConfig
}
if config.Database == "" {
config.Database = defaultDatabase
}
if config.Host == "" {
config.Host = defaultHost
}
if config.Measurement == "" {
config.Measurement = defaultMeasurement
if hn, err := os.Hostname(); err != nil {
config.Measurement += ".unknown"
} else {
config.Measurement += "." + hn
}
}
if config.CollectionInterval == 0 {
config.CollectionInterval = defaultCollectionInterval
}
if config.BatchInterval == 0 {
config.BatchInterval = defaultBatchInterval
}
if config.Logger == nil {
config.Logger = &DefaultLogger{}
}
return config, nil
}
func RunCollector(config *Config) (err error) {
if config, err = config.init(); err != nil {
return err
}
// Make client
clnt, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://" + config.Host,
Username: config.Username,
Password: config.Password,
})
if err != nil {
return errors.Wrap(err, "failed to create influxdb client")
}
// Ping InfluxDB to ensure there is a connection
if _, _, err := clnt.Ping(5 * time.Second); err != nil {
return errors.Wrap(err, "failed to ping influxdb client")
}
// Auto create database
_, err = queryDB(clnt, fmt.Sprintf("CREATE DATABASE \"%s\"", config.Database))
if err != nil {
config.Logger.Fatalln(err)
}
_runStats := &runStats{
logger: config.Logger,
client: clnt,
config: config,
pc: make(chan *client.Point),
}
bp, err := _runStats.newBatch()
if err != nil {
return err
}
_runStats.points = bp
go _runStats.loop(config.BatchInterval)
_collector := collector.New(_runStats.onNewPoint)
_collector.PauseDur = config.CollectionInterval
_collector.EnableCPU = !config.DisableCpu
_collector.EnableMem = !config.DisableMem
_collector.EnableGC = !config.DisableGc
go _collector.Run()
return nil
}
type runStats struct {
logger Logger
client client.Client
points client.BatchPoints
config *Config
pc chan *client.Point
}
func (r *runStats) onNewPoint(fields collector.Fields) {
pt, err := client.NewPoint(r.config.Measurement, fields.Tags(), fields.Values(), time.Now())
if err != nil {
r.logger.Fatalln(errors.Wrap(err, "error while creating point"))
}
r.pc <- pt
}
func (r *runStats) newBatch() (bp client.BatchPoints, err error) {
bp, err = client.NewBatchPoints(client.BatchPointsConfig{
Database: r.config.Database,
Precision: r.config.Precision,
RetentionPolicy: r.config.RetentionPolicy,
})
if err != nil {
r.logger.Fatalln(errors.Wrap(err, "could not create BatchPoints"))
}
return
}
// Write collected points to influxdb periodically
func (r *runStats) loop(interval time.Duration) {
ticks := time.Tick(interval)
for {
select {
case <-ticks:
if r.points == nil || len(r.points.Points()) <= 0 {
continue
}
if err := r.client.Write(r.points); err != nil {
r.logger.Fatalln(errors.Wrap(err, "could not write points to InfluxDB"))
continue
}
r.points = nil
bp, err := r.newBatch()
if err != nil {
r.logger.Fatalln(errors.Wrap(err, "could not create BatchPoints"))
continue
}
r.points = bp
case pt := <-r.pc:
if r.points != nil {
r.logger.Println(pt.String())
r.points.AddPoint(pt)
}
}
}
}
type Logger interface {
Println(v ...interface{})
Fatalln(v ...interface{})
}
type DefaultLogger struct{}
func (*DefaultLogger) Println(v ...interface{}) {}
func (*DefaultLogger) Fatalln(v ...interface{}) { log.Fatalln(v) }
func queryDB(clnt client.Client, cmd string) (res []client.Result, err error) {
q := client.Query{
Command: cmd,
}
if response, err := clnt.Query(q); err == nil {
if response.Error() != nil {
return res, response.Error()
}
res = response.Results
} else {
return res, err
}
return res, nil
}