qmp-testing-suite/golang-producer-consumer/vendor/github.com/alexcesaro/statsd/conn.go

271 lines
5.4 KiB
Go
Executable File

package statsd
import (
"io"
"math/rand"
"net"
"strconv"
"sync"
"time"
)
type conn struct {
// Fields settable with options at Client's creation.
addr string
errorHandler func(error)
flushPeriod time.Duration
maxPacketSize int
network string
tagFormat TagFormat
mu sync.Mutex
// Fields guarded by the mutex.
closed bool
w io.WriteCloser
buf []byte
rateCache map[float32]string
}
func newConn(conf connConfig, muted bool) (*conn, error) {
c := &conn{
addr: conf.Addr,
errorHandler: conf.ErrorHandler,
flushPeriod: conf.FlushPeriod,
maxPacketSize: conf.MaxPacketSize,
network: conf.Network,
tagFormat: conf.TagFormat,
}
if muted {
return c, nil
}
var err error
c.w, err = dialTimeout(c.network, c.addr, 5*time.Second)
if err != nil {
return c, err
}
// When using UDP do a quick check to see if something is listening on the
// given port to return an error as soon as possible.
if c.network[:3] == "udp" {
for i := 0; i < 2; i++ {
_, err = c.w.Write(nil)
if err != nil {
_ = c.w.Close()
c.w = nil
return c, err
}
}
}
// To prevent a buffer overflow add some capacity to the buffer to allow for
// an additional metric.
c.buf = make([]byte, 0, c.maxPacketSize+200)
if c.flushPeriod > 0 {
go func() {
ticker := time.NewTicker(c.flushPeriod)
for _ = range ticker.C {
c.mu.Lock()
if c.closed {
ticker.Stop()
c.mu.Unlock()
return
}
c.flush(0)
c.mu.Unlock()
}
}()
}
return c, nil
}
func (c *conn) metric(prefix, bucket string, n interface{}, typ string, rate float32, tags string) {
c.mu.Lock()
l := len(c.buf)
c.appendBucket(prefix, bucket, tags)
c.appendNumber(n)
c.appendType(typ)
c.appendRate(rate)
c.closeMetric(tags)
c.flushIfBufferFull(l)
c.mu.Unlock()
}
func (c *conn) gauge(prefix, bucket string, value interface{}, tags string) {
c.mu.Lock()
l := len(c.buf)
// To set a gauge to a negative value we must first set it to 0.
// https://github.com/etsy/statsd/blob/master/docs/metric_types.md#gauges
if isNegative(value) {
c.appendBucket(prefix, bucket, tags)
c.appendGauge(0, tags)
}
c.appendBucket(prefix, bucket, tags)
c.appendGauge(value, tags)
c.flushIfBufferFull(l)
c.mu.Unlock()
}
func (c *conn) appendGauge(value interface{}, tags string) {
c.appendNumber(value)
c.appendType("g")
c.closeMetric(tags)
}
func (c *conn) unique(prefix, bucket string, value string, tags string) {
c.mu.Lock()
l := len(c.buf)
c.appendBucket(prefix, bucket, tags)
c.appendString(value)
c.appendType("s")
c.closeMetric(tags)
c.flushIfBufferFull(l)
c.mu.Unlock()
}
func (c *conn) appendByte(b byte) {
c.buf = append(c.buf, b)
}
func (c *conn) appendString(s string) {
c.buf = append(c.buf, s...)
}
func (c *conn) appendNumber(v interface{}) {
switch n := v.(type) {
case int:
c.buf = strconv.AppendInt(c.buf, int64(n), 10)
case uint:
c.buf = strconv.AppendUint(c.buf, uint64(n), 10)
case int64:
c.buf = strconv.AppendInt(c.buf, n, 10)
case uint64:
c.buf = strconv.AppendUint(c.buf, n, 10)
case int32:
c.buf = strconv.AppendInt(c.buf, int64(n), 10)
case uint32:
c.buf = strconv.AppendUint(c.buf, uint64(n), 10)
case int16:
c.buf = strconv.AppendInt(c.buf, int64(n), 10)
case uint16:
c.buf = strconv.AppendUint(c.buf, uint64(n), 10)
case int8:
c.buf = strconv.AppendInt(c.buf, int64(n), 10)
case uint8:
c.buf = strconv.AppendUint(c.buf, uint64(n), 10)
case float64:
c.buf = strconv.AppendFloat(c.buf, n, 'f', -1, 64)
case float32:
c.buf = strconv.AppendFloat(c.buf, float64(n), 'f', -1, 32)
}
}
func isNegative(v interface{}) bool {
switch n := v.(type) {
case int:
return n < 0
case uint:
return n < 0
case int64:
return n < 0
case uint64:
return n < 0
case int32:
return n < 0
case uint32:
return n < 0
case int16:
return n < 0
case uint16:
return n < 0
case int8:
return n < 0
case uint8:
return n < 0
case float64:
return n < 0
case float32:
return n < 0
}
return false
}
func (c *conn) appendBucket(prefix, bucket string, tags string) {
c.appendString(prefix)
c.appendString(bucket)
if c.tagFormat == InfluxDB {
c.appendString(tags)
}
c.appendByte(':')
}
func (c *conn) appendType(t string) {
c.appendByte('|')
c.appendString(t)
}
func (c *conn) appendRate(rate float32) {
if rate == 1 {
return
}
if c.rateCache == nil {
c.rateCache = make(map[float32]string)
}
c.appendString("|@")
if s, ok := c.rateCache[rate]; ok {
c.appendString(s)
} else {
s = strconv.FormatFloat(float64(rate), 'f', -1, 32)
c.rateCache[rate] = s
c.appendString(s)
}
}
func (c *conn) closeMetric(tags string) {
if c.tagFormat == Datadog {
c.appendString(tags)
}
c.appendByte('\n')
}
func (c *conn) flushIfBufferFull(lastSafeLen int) {
if len(c.buf) > c.maxPacketSize {
c.flush(lastSafeLen)
}
}
// flush flushes the first n bytes of the buffer.
// If n is 0, the whole buffer is flushed.
func (c *conn) flush(n int) {
if len(c.buf) == 0 {
return
}
if n == 0 {
n = len(c.buf)
}
// Trim the last \n, StatsD does not like it.
_, err := c.w.Write(c.buf[:n-1])
c.handleError(err)
if n < len(c.buf) {
copy(c.buf, c.buf[n:])
}
c.buf = c.buf[:len(c.buf)-n]
}
func (c *conn) handleError(err error) {
if err != nil && c.errorHandler != nil {
c.errorHandler(err)
}
}
// Stubbed out for testing.
var (
dialTimeout = net.DialTimeout
now = time.Now
randFloat = rand.Float32
)