From 8b009ee83fde9c17fe2a7a194c191cea2fc00b2b Mon Sep 17 00:00:00 2001 From: woodsaj Date: Wed, 25 Jan 2017 17:25:01 +0800 Subject: [PATCH 1/4] update raintank/schema.v1 to the latest minor release --- .../gopkg.in/raintank/schema.v1/event_gen.go | 83 +++-- vendor/gopkg.in/raintank/schema.v1/metric.go | 82 +++-- .../gopkg.in/raintank/schema.v1/metric_gen.go | 294 +++++++----------- vendor/gopkg.in/raintank/schema.v1/point.go | 1 + .../gopkg.in/raintank/schema.v1/point_gen.go | 125 ++++++++ vendor/vendor.json | 10 +- 6 files changed, 338 insertions(+), 257 deletions(-) create mode 100644 vendor/gopkg.in/raintank/schema.v1/point_gen.go diff --git a/vendor/gopkg.in/raintank/schema.v1/event_gen.go b/vendor/gopkg.in/raintank/schema.v1/event_gen.go index 687d2a9f..c9c23bc8 100644 --- a/vendor/gopkg.in/raintank/schema.v1/event_gen.go +++ b/vendor/gopkg.in/raintank/schema.v1/event_gen.go @@ -4,21 +4,19 @@ package schema // MSGP CODE GENERATION TOOL (github.com/tinylib/msgp) // DO NOT EDIT -import ( - "github.com/tinylib/msgp/msgp" -) +import "github.com/tinylib/msgp/msgp" // DecodeMsg implements msgp.Decodable func (z *ProbeEvent) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte _ = field - var isz uint32 - isz, err = dc.ReadMapHeader() + var zbai uint32 + zbai, err = dc.ReadMapHeader() if err != nil { return } - for isz > 0 { - isz-- + for zbai > 0 { + zbai-- field, err = dc.ReadMapKeyPtr() if err != nil { return @@ -60,31 +58,31 @@ func (z *ProbeEvent) DecodeMsg(dc *msgp.Reader) (err error) { return } case "Tags": - var msz uint32 - msz, err = dc.ReadMapHeader() + var zcmr uint32 + zcmr, err = dc.ReadMapHeader() if err != nil { return } - if z.Tags == nil && msz > 0 { - z.Tags = make(map[string]string, msz) + if z.Tags == nil && zcmr > 0 { + z.Tags = make(map[string]string, zcmr) } else if len(z.Tags) > 0 { for key, _ := range z.Tags { delete(z.Tags, key) } } - for msz > 0 { - msz-- - var xvk string - var bzg string - xvk, err = dc.ReadString() + for zcmr > 0 { + zcmr-- + var zxvk string + var zbzg string + zxvk, err = dc.ReadString() if err != nil { return } - bzg, err = dc.ReadString() + zbzg, err = dc.ReadString() if err != nil { return } - z.Tags[xvk] = bzg + z.Tags[zxvk] = zbzg } default: err = dc.Skip() @@ -171,12 +169,12 @@ func (z *ProbeEvent) EncodeMsg(en *msgp.Writer) (err error) { if err != nil { return } - for xvk, bzg := range z.Tags { - err = en.WriteString(xvk) + for zxvk, zbzg := range z.Tags { + err = en.WriteString(zxvk) if err != nil { return } - err = en.WriteString(bzg) + err = en.WriteString(zbzg) if err != nil { return } @@ -212,9 +210,9 @@ func (z *ProbeEvent) MarshalMsg(b []byte) (o []byte, err error) { // string "Tags" o = append(o, 0xa4, 0x54, 0x61, 0x67, 0x73) o = msgp.AppendMapHeader(o, uint32(len(z.Tags))) - for xvk, bzg := range z.Tags { - o = msgp.AppendString(o, xvk) - o = msgp.AppendString(o, bzg) + for zxvk, zbzg := range z.Tags { + o = msgp.AppendString(o, zxvk) + o = msgp.AppendString(o, zbzg) } return } @@ -223,13 +221,13 @@ func (z *ProbeEvent) MarshalMsg(b []byte) (o []byte, err error) { func (z *ProbeEvent) UnmarshalMsg(bts []byte) (o []byte, err error) { var field []byte _ = field - var isz uint32 - isz, bts, err = msgp.ReadMapHeaderBytes(bts) + var zajw uint32 + zajw, bts, err = msgp.ReadMapHeaderBytes(bts) if err != nil { return } - for isz > 0 { - isz-- + for zajw > 0 { + zajw-- field, bts, err = msgp.ReadMapKeyZC(bts) if err != nil { return @@ -271,31 +269,31 @@ func (z *ProbeEvent) UnmarshalMsg(bts []byte) (o []byte, err error) { return } case "Tags": - var msz uint32 - msz, bts, err = msgp.ReadMapHeaderBytes(bts) + var zwht uint32 + zwht, bts, err = msgp.ReadMapHeaderBytes(bts) if err != nil { return } - if z.Tags == nil && msz > 0 { - z.Tags = make(map[string]string, msz) + if z.Tags == nil && zwht > 0 { + z.Tags = make(map[string]string, zwht) } else if len(z.Tags) > 0 { for key, _ := range z.Tags { delete(z.Tags, key) } } - for msz > 0 { - var xvk string - var bzg string - msz-- - xvk, bts, err = msgp.ReadStringBytes(bts) + for zwht > 0 { + var zxvk string + var zbzg string + zwht-- + zxvk, bts, err = msgp.ReadStringBytes(bts) if err != nil { return } - bzg, bts, err = msgp.ReadStringBytes(bts) + zbzg, bts, err = msgp.ReadStringBytes(bts) if err != nil { return } - z.Tags[xvk] = bzg + z.Tags[zxvk] = zbzg } default: bts, err = msgp.Skip(bts) @@ -308,12 +306,13 @@ func (z *ProbeEvent) UnmarshalMsg(bts []byte) (o []byte, err error) { return } +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *ProbeEvent) Msgsize() (s int) { s = 1 + 3 + msgp.StringPrefixSize + len(z.Id) + 10 + msgp.StringPrefixSize + len(z.EventType) + 6 + msgp.Int64Size + 9 + msgp.StringPrefixSize + len(z.Severity) + 7 + msgp.StringPrefixSize + len(z.Source) + 10 + msgp.Int64Size + 8 + msgp.StringPrefixSize + len(z.Message) + 5 + msgp.MapHeaderSize if z.Tags != nil { - for xvk, bzg := range z.Tags { - _ = bzg - s += msgp.StringPrefixSize + len(xvk) + msgp.StringPrefixSize + len(bzg) + for zxvk, zbzg := range z.Tags { + _ = zbzg + s += msgp.StringPrefixSize + len(zxvk) + msgp.StringPrefixSize + len(zbzg) } } return diff --git a/vendor/gopkg.in/raintank/schema.v1/metric.go b/vendor/gopkg.in/raintank/schema.v1/metric.go index ba76efdf..85473244 100644 --- a/vendor/gopkg.in/raintank/schema.v1/metric.go +++ b/vendor/gopkg.in/raintank/schema.v1/metric.go @@ -3,11 +3,11 @@ package schema import ( "bytes" "crypto/md5" + "encoding/binary" "encoding/json" "errors" "fmt" "sort" - "strings" ) var errInvalidIntervalzero = errors.New("interval cannot be 0") @@ -16,6 +16,19 @@ var errInvalidEmptyName = errors.New("name cannot be empty") var errInvalidEmptyMetric = errors.New("metric cannot be empty") var errInvalidMtype = errors.New("invalid mtype") +type PartitionedMetric interface { + Validate() error + SetId() + // return a []byte key comprised of the metric's OrgId + // accepts an input []byte to allow callers to re-use + // buffers to reduce memory allocations + KeyByOrgId([]byte) []byte + // return a []byte key comprised of the metric's Name + // accepts an input []byte to allow callers to re-use + // buffers to reduce memory allocations + KeyBySeries([]byte) []byte +} + //go:generate msgp // MetricData contains all metric metadata (some as fields, some as tags) and a datapoint @@ -51,6 +64,25 @@ func (m *MetricData) Validate() error { return nil } +func (m *MetricData) KeyByOrgId(b []byte) []byte { + if cap(b)-len(b) < 4 { + // not enough unused space in the slice so we need to grow it. + newBuf := make([]byte, len(b), len(b)+4) + copy(newBuf, b) + b = newBuf + } + // PutUint32 writes directly to the slice rather then appending. + // so we need to set the length to 4 more bytes then it currently is. + b = b[:len(b)+4] + binary.LittleEndian.PutUint32(b[len(b)-4:], uint32(m.OrgId)) + return b +} + +func (m *MetricData) KeyBySeries(b []byte) []byte { + b = append(b, []byte(m.Name)...) + return b +} + // returns a id (hash key) in the format OrgId.md5Sum // the md5sum is a hash of the the concatination of the // metric + each tag key:value pair (in metrics2.0 sense, so also fields), sorted alphabetically. @@ -77,17 +109,16 @@ type MetricDataArray []*MetricData // for ES type MetricDefinition struct { - Id string `json:"id"` - OrgId int `json:"org_id"` - Name string `json:"name" elastic:"type:string,index:not_analyzed"` // graphite format - Metric string `json:"metric"` // kairosdb format (like graphite, but not including some tags) - Interval int `json:"interval"` // minimum 10 - Unit string `json:"unit"` - Mtype string `json:"mtype"` - Tags []string `json:"tags" elastic:"type:string,index:not_analyzed"` - LastUpdate int64 `json:"lastUpdate"` // unix timestamp - Nodes map[string]string `json:"nodes"` - NodeCount int `json:"node_count"` + Id string `json:"id"` + OrgId int `json:"org_id"` + Name string `json:"name" elastic:"type:string,index:not_analyzed"` // graphite format + Metric string `json:"metric"` // kairosdb format (like graphite, but not including some tags) + Interval int `json:"interval"` // minimum 10 + Unit string `json:"unit"` + Mtype string `json:"mtype"` + Tags []string `json:"tags" elastic:"type:string,index:not_analyzed"` + LastUpdate int64 `json:"lastUpdate"` // unix timestamp + Partition int32 `json:"partition"` } func (m *MetricDefinition) SetId() { @@ -127,6 +158,25 @@ func (m *MetricDefinition) Validate() error { return nil } +func (m *MetricDefinition) KeyByOrgId(b []byte) []byte { + if cap(b)-len(b) < 4 { + // not enough unused space in the slice so we need to grow it. + newBuf := make([]byte, len(b), len(b)+4) + copy(newBuf, b) + b = newBuf + } + // PutUint32 writes directly to the slice rather then appending. + // so we need to set the length to 4 more bytes then it currently is. + b = b[:len(b)+4] + binary.LittleEndian.PutUint32(b[len(b)-4:], uint32(m.OrgId)) + return b +} + +func (m *MetricDefinition) KeyBySeries(b []byte) []byte { + b = append(b, []byte(m.Name)...) + return b +} + func MetricDefinitionFromJSON(b []byte) (*MetricDefinition, error) { def := new(MetricDefinition) if err := json.Unmarshal(b, &def); err != nil { @@ -138,12 +188,6 @@ func MetricDefinitionFromJSON(b []byte) (*MetricDefinition, error) { // MetricDefinitionFromMetricData yields a MetricDefinition that has no references // to the original MetricData func MetricDefinitionFromMetricData(d *MetricData) *MetricDefinition { - nodesMap := make(map[string]string) - nodes := strings.Split(d.Name, ".") - for i, n := range nodes { - key := fmt.Sprintf("n%d", i) - nodesMap[key] = n - } tags := make([]string, len(d.Tags)) copy(tags, d.Tags) return &MetricDefinition{ @@ -156,7 +200,5 @@ func MetricDefinitionFromMetricData(d *MetricData) *MetricDefinition { LastUpdate: d.Time, Unit: d.Unit, Tags: tags, - Nodes: nodesMap, - NodeCount: len(nodes), } } diff --git a/vendor/gopkg.in/raintank/schema.v1/metric_gen.go b/vendor/gopkg.in/raintank/schema.v1/metric_gen.go index c9fb7448..d0f17dc7 100644 --- a/vendor/gopkg.in/raintank/schema.v1/metric_gen.go +++ b/vendor/gopkg.in/raintank/schema.v1/metric_gen.go @@ -4,21 +4,19 @@ package schema // MSGP CODE GENERATION TOOL (github.com/tinylib/msgp) // DO NOT EDIT -import ( - "github.com/tinylib/msgp/msgp" -) +import "github.com/tinylib/msgp/msgp" // DecodeMsg implements msgp.Decodable func (z *MetricData) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte _ = field - var isz uint32 - isz, err = dc.ReadMapHeader() + var zbzg uint32 + zbzg, err = dc.ReadMapHeader() if err != nil { return } - for isz > 0 { - isz-- + for zbzg > 0 { + zbzg-- field, err = dc.ReadMapKeyPtr() if err != nil { return @@ -70,18 +68,18 @@ func (z *MetricData) DecodeMsg(dc *msgp.Reader) (err error) { return } case "Tags": - var xsz uint32 - xsz, err = dc.ReadArrayHeader() + var zbai uint32 + zbai, err = dc.ReadArrayHeader() if err != nil { return } - if cap(z.Tags) >= int(xsz) { - z.Tags = z.Tags[:xsz] + if cap(z.Tags) >= int(zbai) { + z.Tags = (z.Tags)[:zbai] } else { - z.Tags = make([]string, xsz) + z.Tags = make([]string, zbai) } - for xvk := range z.Tags { - z.Tags[xvk], err = dc.ReadString() + for zxvk := range z.Tags { + z.Tags[zxvk], err = dc.ReadString() if err != nil { return } @@ -189,8 +187,8 @@ func (z *MetricData) EncodeMsg(en *msgp.Writer) (err error) { if err != nil { return } - for xvk := range z.Tags { - err = en.WriteString(z.Tags[xvk]) + for zxvk := range z.Tags { + err = en.WriteString(z.Tags[zxvk]) if err != nil { return } @@ -232,8 +230,8 @@ func (z *MetricData) MarshalMsg(b []byte) (o []byte, err error) { // string "Tags" o = append(o, 0xa4, 0x54, 0x61, 0x67, 0x73) o = msgp.AppendArrayHeader(o, uint32(len(z.Tags))) - for xvk := range z.Tags { - o = msgp.AppendString(o, z.Tags[xvk]) + for zxvk := range z.Tags { + o = msgp.AppendString(o, z.Tags[zxvk]) } return } @@ -242,13 +240,13 @@ func (z *MetricData) MarshalMsg(b []byte) (o []byte, err error) { func (z *MetricData) UnmarshalMsg(bts []byte) (o []byte, err error) { var field []byte _ = field - var isz uint32 - isz, bts, err = msgp.ReadMapHeaderBytes(bts) + var zcmr uint32 + zcmr, bts, err = msgp.ReadMapHeaderBytes(bts) if err != nil { return } - for isz > 0 { - isz-- + for zcmr > 0 { + zcmr-- field, bts, err = msgp.ReadMapKeyZC(bts) if err != nil { return @@ -300,18 +298,18 @@ func (z *MetricData) UnmarshalMsg(bts []byte) (o []byte, err error) { return } case "Tags": - var xsz uint32 - xsz, bts, err = msgp.ReadArrayHeaderBytes(bts) + var zajw uint32 + zajw, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { return } - if cap(z.Tags) >= int(xsz) { - z.Tags = z.Tags[:xsz] + if cap(z.Tags) >= int(zajw) { + z.Tags = (z.Tags)[:zajw] } else { - z.Tags = make([]string, xsz) + z.Tags = make([]string, zajw) } - for xvk := range z.Tags { - z.Tags[xvk], bts, err = msgp.ReadStringBytes(bts) + for zxvk := range z.Tags { + z.Tags[zxvk], bts, err = msgp.ReadStringBytes(bts) if err != nil { return } @@ -327,38 +325,39 @@ func (z *MetricData) UnmarshalMsg(bts []byte) (o []byte, err error) { return } +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *MetricData) Msgsize() (s int) { s = 1 + 3 + msgp.StringPrefixSize + len(z.Id) + 6 + msgp.IntSize + 5 + msgp.StringPrefixSize + len(z.Name) + 7 + msgp.StringPrefixSize + len(z.Metric) + 9 + msgp.IntSize + 6 + msgp.Float64Size + 5 + msgp.StringPrefixSize + len(z.Unit) + 5 + msgp.Int64Size + 6 + msgp.StringPrefixSize + len(z.Mtype) + 5 + msgp.ArrayHeaderSize - for xvk := range z.Tags { - s += msgp.StringPrefixSize + len(z.Tags[xvk]) + for zxvk := range z.Tags { + s += msgp.StringPrefixSize + len(z.Tags[zxvk]) } return } // DecodeMsg implements msgp.Decodable func (z *MetricDataArray) DecodeMsg(dc *msgp.Reader) (err error) { - var xsz uint32 - xsz, err = dc.ReadArrayHeader() + var zcua uint32 + zcua, err = dc.ReadArrayHeader() if err != nil { return } - if cap((*z)) >= int(xsz) { - (*z) = (*z)[:xsz] + if cap((*z)) >= int(zcua) { + (*z) = (*z)[:zcua] } else { - (*z) = make(MetricDataArray, xsz) + (*z) = make(MetricDataArray, zcua) } - for bai := range *z { + for zhct := range *z { if dc.IsNil() { err = dc.ReadNil() if err != nil { return } - (*z)[bai] = nil + (*z)[zhct] = nil } else { - if (*z)[bai] == nil { - (*z)[bai] = new(MetricData) + if (*z)[zhct] == nil { + (*z)[zhct] = new(MetricData) } - err = (*z)[bai].DecodeMsg(dc) + err = (*z)[zhct].DecodeMsg(dc) if err != nil { return } @@ -373,14 +372,14 @@ func (z MetricDataArray) EncodeMsg(en *msgp.Writer) (err error) { if err != nil { return } - for cmr := range z { - if z[cmr] == nil { + for zxhx := range z { + if z[zxhx] == nil { err = en.WriteNil() if err != nil { return } } else { - err = z[cmr].EncodeMsg(en) + err = z[zxhx].EncodeMsg(en) if err != nil { return } @@ -393,11 +392,11 @@ func (z MetricDataArray) EncodeMsg(en *msgp.Writer) (err error) { func (z MetricDataArray) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) o = msgp.AppendArrayHeader(o, uint32(len(z))) - for cmr := range z { - if z[cmr] == nil { + for zxhx := range z { + if z[zxhx] == nil { o = msgp.AppendNil(o) } else { - o, err = z[cmr].MarshalMsg(o) + o, err = z[zxhx].MarshalMsg(o) if err != nil { return } @@ -408,28 +407,28 @@ func (z MetricDataArray) MarshalMsg(b []byte) (o []byte, err error) { // UnmarshalMsg implements msgp.Unmarshaler func (z *MetricDataArray) UnmarshalMsg(bts []byte) (o []byte, err error) { - var xsz uint32 - xsz, bts, err = msgp.ReadArrayHeaderBytes(bts) + var zdaf uint32 + zdaf, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { return } - if cap((*z)) >= int(xsz) { - (*z) = (*z)[:xsz] + if cap((*z)) >= int(zdaf) { + (*z) = (*z)[:zdaf] } else { - (*z) = make(MetricDataArray, xsz) + (*z) = make(MetricDataArray, zdaf) } - for ajw := range *z { + for zlqf := range *z { if msgp.IsNil(bts) { bts, err = msgp.ReadNilBytes(bts) if err != nil { return } - (*z)[ajw] = nil + (*z)[zlqf] = nil } else { - if (*z)[ajw] == nil { - (*z)[ajw] = new(MetricData) + if (*z)[zlqf] == nil { + (*z)[zlqf] = new(MetricData) } - bts, err = (*z)[ajw].UnmarshalMsg(bts) + bts, err = (*z)[zlqf].UnmarshalMsg(bts) if err != nil { return } @@ -439,13 +438,14 @@ func (z *MetricDataArray) UnmarshalMsg(bts []byte) (o []byte, err error) { return } +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z MetricDataArray) Msgsize() (s int) { s = msgp.ArrayHeaderSize - for wht := range z { - if z[wht] == nil { + for zpks := range z { + if z[zpks] == nil { s += msgp.NilSize } else { - s += z[wht].Msgsize() + s += z[zpks].Msgsize() } } return @@ -455,13 +455,13 @@ func (z MetricDataArray) Msgsize() (s int) { func (z *MetricDefinition) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte _ = field - var isz uint32 - isz, err = dc.ReadMapHeader() + var zcxo uint32 + zcxo, err = dc.ReadMapHeader() if err != nil { return } - for isz > 0 { - isz-- + for zcxo > 0 { + zcxo-- field, err = dc.ReadMapKeyPtr() if err != nil { return @@ -503,18 +503,18 @@ func (z *MetricDefinition) DecodeMsg(dc *msgp.Reader) (err error) { return } case "Tags": - var xsz uint32 - xsz, err = dc.ReadArrayHeader() + var zeff uint32 + zeff, err = dc.ReadArrayHeader() if err != nil { return } - if cap(z.Tags) >= int(xsz) { - z.Tags = z.Tags[:xsz] + if cap(z.Tags) >= int(zeff) { + z.Tags = (z.Tags)[:zeff] } else { - z.Tags = make([]string, xsz) + z.Tags = make([]string, zeff) } - for hct := range z.Tags { - z.Tags[hct], err = dc.ReadString() + for zjfb := range z.Tags { + z.Tags[zjfb], err = dc.ReadString() if err != nil { return } @@ -524,35 +524,8 @@ func (z *MetricDefinition) DecodeMsg(dc *msgp.Reader) (err error) { if err != nil { return } - case "Nodes": - var msz uint32 - msz, err = dc.ReadMapHeader() - if err != nil { - return - } - if z.Nodes == nil && msz > 0 { - z.Nodes = make(map[string]string, msz) - } else if len(z.Nodes) > 0 { - for key, _ := range z.Nodes { - delete(z.Nodes, key) - } - } - for msz > 0 { - msz-- - var cua string - var xhx string - cua, err = dc.ReadString() - if err != nil { - return - } - xhx, err = dc.ReadString() - if err != nil { - return - } - z.Nodes[cua] = xhx - } - case "NodeCount": - z.NodeCount, err = dc.ReadInt() + case "Partition": + z.Partition, err = dc.ReadInt32() if err != nil { return } @@ -568,9 +541,9 @@ func (z *MetricDefinition) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *MetricDefinition) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 11 + // map header, size 10 // write "Id" - err = en.Append(0x8b, 0xa2, 0x49, 0x64) + err = en.Append(0x8a, 0xa2, 0x49, 0x64) if err != nil { return err } @@ -641,8 +614,8 @@ func (z *MetricDefinition) EncodeMsg(en *msgp.Writer) (err error) { if err != nil { return } - for hct := range z.Tags { - err = en.WriteString(z.Tags[hct]) + for zjfb := range z.Tags { + err = en.WriteString(z.Tags[zjfb]) if err != nil { return } @@ -656,31 +629,12 @@ func (z *MetricDefinition) EncodeMsg(en *msgp.Writer) (err error) { if err != nil { return } - // write "Nodes" - err = en.Append(0xa5, 0x4e, 0x6f, 0x64, 0x65, 0x73) - if err != nil { - return err - } - err = en.WriteMapHeader(uint32(len(z.Nodes))) - if err != nil { - return - } - for cua, xhx := range z.Nodes { - err = en.WriteString(cua) - if err != nil { - return - } - err = en.WriteString(xhx) - if err != nil { - return - } - } - // write "NodeCount" - err = en.Append(0xa9, 0x4e, 0x6f, 0x64, 0x65, 0x43, 0x6f, 0x75, 0x6e, 0x74) + // write "Partition" + err = en.Append(0xa9, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e) if err != nil { return err } - err = en.WriteInt(z.NodeCount) + err = en.WriteInt32(z.Partition) if err != nil { return } @@ -690,9 +644,9 @@ func (z *MetricDefinition) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *MetricDefinition) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 11 + // map header, size 10 // string "Id" - o = append(o, 0x8b, 0xa2, 0x49, 0x64) + o = append(o, 0x8a, 0xa2, 0x49, 0x64) o = msgp.AppendString(o, z.Id) // string "OrgId" o = append(o, 0xa5, 0x4f, 0x72, 0x67, 0x49, 0x64) @@ -715,22 +669,15 @@ func (z *MetricDefinition) MarshalMsg(b []byte) (o []byte, err error) { // string "Tags" o = append(o, 0xa4, 0x54, 0x61, 0x67, 0x73) o = msgp.AppendArrayHeader(o, uint32(len(z.Tags))) - for hct := range z.Tags { - o = msgp.AppendString(o, z.Tags[hct]) + for zjfb := range z.Tags { + o = msgp.AppendString(o, z.Tags[zjfb]) } // string "LastUpdate" o = append(o, 0xaa, 0x4c, 0x61, 0x73, 0x74, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65) o = msgp.AppendInt64(o, z.LastUpdate) - // string "Nodes" - o = append(o, 0xa5, 0x4e, 0x6f, 0x64, 0x65, 0x73) - o = msgp.AppendMapHeader(o, uint32(len(z.Nodes))) - for cua, xhx := range z.Nodes { - o = msgp.AppendString(o, cua) - o = msgp.AppendString(o, xhx) - } - // string "NodeCount" - o = append(o, 0xa9, 0x4e, 0x6f, 0x64, 0x65, 0x43, 0x6f, 0x75, 0x6e, 0x74) - o = msgp.AppendInt(o, z.NodeCount) + // string "Partition" + o = append(o, 0xa9, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e) + o = msgp.AppendInt32(o, z.Partition) return } @@ -738,13 +685,13 @@ func (z *MetricDefinition) MarshalMsg(b []byte) (o []byte, err error) { func (z *MetricDefinition) UnmarshalMsg(bts []byte) (o []byte, err error) { var field []byte _ = field - var isz uint32 - isz, bts, err = msgp.ReadMapHeaderBytes(bts) + var zrsw uint32 + zrsw, bts, err = msgp.ReadMapHeaderBytes(bts) if err != nil { return } - for isz > 0 { - isz-- + for zrsw > 0 { + zrsw-- field, bts, err = msgp.ReadMapKeyZC(bts) if err != nil { return @@ -786,18 +733,18 @@ func (z *MetricDefinition) UnmarshalMsg(bts []byte) (o []byte, err error) { return } case "Tags": - var xsz uint32 - xsz, bts, err = msgp.ReadArrayHeaderBytes(bts) + var zxpk uint32 + zxpk, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { return } - if cap(z.Tags) >= int(xsz) { - z.Tags = z.Tags[:xsz] + if cap(z.Tags) >= int(zxpk) { + z.Tags = (z.Tags)[:zxpk] } else { - z.Tags = make([]string, xsz) + z.Tags = make([]string, zxpk) } - for hct := range z.Tags { - z.Tags[hct], bts, err = msgp.ReadStringBytes(bts) + for zjfb := range z.Tags { + z.Tags[zjfb], bts, err = msgp.ReadStringBytes(bts) if err != nil { return } @@ -807,35 +754,8 @@ func (z *MetricDefinition) UnmarshalMsg(bts []byte) (o []byte, err error) { if err != nil { return } - case "Nodes": - var msz uint32 - msz, bts, err = msgp.ReadMapHeaderBytes(bts) - if err != nil { - return - } - if z.Nodes == nil && msz > 0 { - z.Nodes = make(map[string]string, msz) - } else if len(z.Nodes) > 0 { - for key, _ := range z.Nodes { - delete(z.Nodes, key) - } - } - for msz > 0 { - var cua string - var xhx string - msz-- - cua, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - return - } - xhx, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - return - } - z.Nodes[cua] = xhx - } - case "NodeCount": - z.NodeCount, bts, err = msgp.ReadIntBytes(bts) + case "Partition": + z.Partition, bts, err = msgp.ReadInt32Bytes(bts) if err != nil { return } @@ -850,18 +770,12 @@ func (z *MetricDefinition) UnmarshalMsg(bts []byte) (o []byte, err error) { return } +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *MetricDefinition) Msgsize() (s int) { s = 1 + 3 + msgp.StringPrefixSize + len(z.Id) + 6 + msgp.IntSize + 5 + msgp.StringPrefixSize + len(z.Name) + 7 + msgp.StringPrefixSize + len(z.Metric) + 9 + msgp.IntSize + 5 + msgp.StringPrefixSize + len(z.Unit) + 6 + msgp.StringPrefixSize + len(z.Mtype) + 5 + msgp.ArrayHeaderSize - for hct := range z.Tags { - s += msgp.StringPrefixSize + len(z.Tags[hct]) - } - s += 11 + msgp.Int64Size + 6 + msgp.MapHeaderSize - if z.Nodes != nil { - for cua, xhx := range z.Nodes { - _ = xhx - s += msgp.StringPrefixSize + len(cua) + msgp.StringPrefixSize + len(xhx) - } + for zjfb := range z.Tags { + s += msgp.StringPrefixSize + len(z.Tags[zjfb]) } - s += 10 + msgp.IntSize + s += 11 + msgp.Int64Size + 10 + msgp.Int32Size return } diff --git a/vendor/gopkg.in/raintank/schema.v1/point.go b/vendor/gopkg.in/raintank/schema.v1/point.go index abfff00f..71c8517e 100644 --- a/vendor/gopkg.in/raintank/schema.v1/point.go +++ b/vendor/gopkg.in/raintank/schema.v1/point.go @@ -1,5 +1,6 @@ package schema +//go:generate msgp type Point struct { Val float64 Ts uint32 diff --git a/vendor/gopkg.in/raintank/schema.v1/point_gen.go b/vendor/gopkg.in/raintank/schema.v1/point_gen.go new file mode 100644 index 00000000..6843d766 --- /dev/null +++ b/vendor/gopkg.in/raintank/schema.v1/point_gen.go @@ -0,0 +1,125 @@ +package schema + +// NOTE: THIS FILE WAS PRODUCED BY THE +// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp) +// DO NOT EDIT + +import ( + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *Point) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zxvk uint32 + zxvk, err = dc.ReadMapHeader() + if err != nil { + return + } + for zxvk > 0 { + zxvk-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + return + } + switch msgp.UnsafeString(field) { + case "Val": + z.Val, err = dc.ReadFloat64() + if err != nil { + return + } + case "Ts": + z.Ts, err = dc.ReadUint32() + if err != nil { + return + } + default: + err = dc.Skip() + if err != nil { + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z Point) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 2 + // write "Val" + err = en.Append(0x82, 0xa3, 0x56, 0x61, 0x6c) + if err != nil { + return err + } + err = en.WriteFloat64(z.Val) + if err != nil { + return + } + // write "Ts" + err = en.Append(0xa2, 0x54, 0x73) + if err != nil { + return err + } + err = en.WriteUint32(z.Ts) + if err != nil { + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z Point) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 2 + // string "Val" + o = append(o, 0x82, 0xa3, 0x56, 0x61, 0x6c) + o = msgp.AppendFloat64(o, z.Val) + // string "Ts" + o = append(o, 0xa2, 0x54, 0x73) + o = msgp.AppendUint32(o, z.Ts) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *Point) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zbzg uint32 + zbzg, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + return + } + for zbzg > 0 { + zbzg-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + return + } + switch msgp.UnsafeString(field) { + case "Val": + z.Val, bts, err = msgp.ReadFloat64Bytes(bts) + if err != nil { + return + } + case "Ts": + z.Ts, bts, err = msgp.ReadUint32Bytes(bts) + if err != nil { + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z Point) Msgsize() (s int) { + s = 1 + 4 + msgp.Float64Size + 3 + msgp.Uint32Size + return +} diff --git a/vendor/vendor.json b/vendor/vendor.json index aa77b149..d0d7a25f 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -153,16 +153,16 @@ "revisionTime": "2015-10-23T22:38:53Z" }, { - "checksumSHA1": "Fv3Cdgosf28iev/JTFWO6nsPFCA=", + "checksumSHA1": "coHpC7KbWRXTjC0JM/bwnBn8zmY=", "path": "gopkg.in/raintank/schema.v1", - "revision": "a2ba0a7af66d6cc470d93bb5725f22f4b3c1ea79", - "revisionTime": "2016-07-20T14:31:18Z" + "revision": "a323316458b5df84827551e9b6b5f61cb2de423b", + "revisionTime": "2017-01-12T12:37:55Z" }, { "checksumSHA1": "MjwLYwIckCtBwXhQMt5ji/L5lJA=", "path": "gopkg.in/raintank/schema.v1/msg", - "revision": "a2ba0a7af66d6cc470d93bb5725f22f4b3c1ea79", - "revisionTime": "2016-07-20T14:31:18Z" + "revision": "a323316458b5df84827551e9b6b5f61cb2de423b", + "revisionTime": "2017-01-12T12:37:55Z" } ], "rootPath": "github.com/graphite-ng/carbon-relay-ng" From 367e248e7bd3b46dc00ae57ece1ba9744a76c06a Mon Sep 17 00:00:00 2001 From: woodsaj Date: Wed, 25 Jan 2017 17:25:39 +0800 Subject: [PATCH 2/4] allow grafanNet route to make concurrent connections to tsdb-gw - adds a "concurrency" config setting for defining the maximum number of concurrent http requests that can be made - split making http requests into separate goroutines so they dont block the main loop. This allows new metrics to be parsed and added to the buffer while the "flush" goroutines are IO blocked. - shard metrics into multiple buffers based on concurrency. This ensures any delays procsesing a buffer dont lead to metrics being sent out of order. - send compressed (snappy) payload to tsdb-gw, results in a 1/4 the bandwidth usage.. --- imperatives/imperatives.go | 17 +- route/grafananet.go | 143 ++-- vendor/github.com/golang/snappy/AUTHORS | 15 + vendor/github.com/golang/snappy/CONTRIBUTORS | 37 + vendor/github.com/golang/snappy/LICENSE | 27 + vendor/github.com/golang/snappy/README | 107 +++ vendor/github.com/golang/snappy/decode.go | 237 ++++++ .../github.com/golang/snappy/decode_amd64.go | 14 + .../github.com/golang/snappy/decode_amd64.s | 490 ++++++++++++ .../github.com/golang/snappy/decode_other.go | 101 +++ vendor/github.com/golang/snappy/encode.go | 285 +++++++ .../github.com/golang/snappy/encode_amd64.go | 29 + .../github.com/golang/snappy/encode_amd64.s | 730 ++++++++++++++++++ .../github.com/golang/snappy/encode_other.go | 238 ++++++ vendor/github.com/golang/snappy/snappy.go | 87 +++ vendor/vendor.json | 6 + 16 files changed, 2517 insertions(+), 46 deletions(-) create mode 100644 vendor/github.com/golang/snappy/AUTHORS create mode 100644 vendor/github.com/golang/snappy/CONTRIBUTORS create mode 100644 vendor/github.com/golang/snappy/LICENSE create mode 100644 vendor/github.com/golang/snappy/README create mode 100644 vendor/github.com/golang/snappy/decode.go create mode 100644 vendor/github.com/golang/snappy/decode_amd64.go create mode 100644 vendor/github.com/golang/snappy/decode_amd64.s create mode 100644 vendor/github.com/golang/snappy/decode_other.go create mode 100644 vendor/github.com/golang/snappy/encode.go create mode 100644 vendor/github.com/golang/snappy/encode_amd64.go create mode 100644 vendor/github.com/golang/snappy/encode_amd64.s create mode 100644 vendor/github.com/golang/snappy/encode_other.go create mode 100644 vendor/github.com/golang/snappy/snappy.go diff --git a/imperatives/imperatives.go b/imperatives/imperatives.go index bd6c35a5..e4833823 100644 --- a/imperatives/imperatives.go +++ b/imperatives/imperatives.go @@ -49,6 +49,7 @@ const ( optTimeout optSSLVerify word + optConcurrency ) // we should make sure we apply changes atomatically. e.g. when changing dest between address A and pickle=false and B with pickle=true, @@ -81,6 +82,7 @@ var tokens = []toki.Def{ {Token: optFlushMaxWait, Pattern: "flushMaxWait="}, {Token: optTimeout, Pattern: "timeout="}, {Token: optSSLVerify, Pattern: "sslverify="}, + {Token: optConcurrency, Pattern: "concurrency="}, {Token: str, Pattern: "\".*\""}, {Token: sep, Pattern: "##"}, {Token: sumFn, Pattern: "sum"}, @@ -94,7 +96,7 @@ var tokens = []toki.Def{ var errFmtAddBlack = errors.New("addBlack ") var errFmtAddAgg = errors.New("addAgg ") var errFmtAddRoute = errors.New("addRoute [prefix/sub/regex=,..] [[...]] where is [prefix/sub,regex,flush,reconn,pickle,spool=...]") // note flush and reconn are ints, pickle and spool are true/false. other options are strings -var errFmtAddRouteGrafanaNet = errors.New("addRoute GrafanaNet key [prefix/sub/regex] addr apiKey schemasFile [spool=true/false sslverify=true/false bufSize=int flushMaxNum=int flushMaxWait=int timeout=int]") +var errFmtAddRouteGrafanaNet = errors.New("addRoute GrafanaNet key [prefix/sub/regex] addr apiKey schemasFile [spool=true/false sslverify=true/false bufSize=int flushMaxNum=int flushMaxWait=int timeout=int concurrency=int]") var errFmtAddDest = errors.New("addDest ") // not implemented yet var errFmtAddRewriter = errors.New("addRewriter ") var errFmtModDest = errors.New("modDest ") // one or more can be specified at once @@ -302,6 +304,7 @@ func readAddRouteGrafanaNet(s *toki.Scanner, table *tbl.Table) error { var flushMaxNum = 10000 // number of metrics var flushMaxWait = 500 // in ms var timeout = 2000 // in ms + var concurrency = 10 // number of concurrent connections to tsdb-gw for ; t.Token != toki.EOF; t = s.Next() { switch t.Token { @@ -345,6 +348,16 @@ func readAddRouteGrafanaNet(s *toki.Scanner, table *tbl.Table) error { } else { return errFmtAddRouteGrafanaNet } + case optConcurrency: + t = s.Next() + if t.Token == num { + concurrency, err = strconv.Atoi(strings.TrimSpace(string(t.Value))) + if err != nil { + return err + } + } else { + return errFmtAddRouteGrafanaNet + } case optTimeout: t = s.Next() if t.Token == num { @@ -371,7 +384,7 @@ func readAddRouteGrafanaNet(s *toki.Scanner, table *tbl.Table) error { } } - route, err := route.NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile, spool, sslVerify, bufSize, flushMaxNum, flushMaxWait, timeout) + route, err := route.NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile, spool, sslVerify, bufSize, flushMaxNum, flushMaxWait, timeout, concurrency) if err != nil { return err } diff --git a/route/grafananet.go b/route/grafananet.go index 147cc41b..35c22ea7 100644 --- a/route/grafananet.go +++ b/route/grafananet.go @@ -4,6 +4,7 @@ import ( "bytes" "crypto/tls" "fmt" + "hash/fnv" "net" "net/http" "strconv" @@ -13,6 +14,7 @@ import ( "time" "github.com/Dieterbe/go-metrics" + "github.com/golang/snappy" dest "github.com/graphite-ng/carbon-relay-ng/destination" "github.com/graphite-ng/carbon-relay-ng/matcher" "github.com/graphite-ng/carbon-relay-ng/stats" @@ -36,6 +38,11 @@ type GrafanaNet struct { flushMaxWait time.Duration timeout time.Duration sslVerify bool + concurrency int + writeQueue []chan []byte + shutdown chan struct{} + wg *sync.WaitGroup + client *http.Client numErrFlush metrics.Counter numOut metrics.Counter // metrics successfully written to our buffered conn (no flushing yet) @@ -49,7 +56,7 @@ type GrafanaNet struct { // NewGrafanaNet creates a special route that writes to a grafana.net datastore // We will automatically run the route and the destination // ignores spool for now -func NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile string, spool, sslVerify bool, bufSize, flushMaxNum, flushMaxWait, timeout int) (Route, error) { +func NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile string, spool, sslVerify bool, bufSize, flushMaxNum, flushMaxWait, timeout, concurrency int) (Route, error) { m, err := matcher.New(prefix, sub, regex) if err != nil { return nil, err @@ -87,6 +94,10 @@ func NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile string, sp flushMaxWait: time.Duration(flushMaxWait) * time.Millisecond, timeout: time.Duration(timeout) * time.Millisecond, sslVerify: sslVerify, + concurrency: concurrency, + writeQueue: make([]chan []byte, concurrency), + shutdown: make(chan struct{}), + wg: new(sync.WaitGroup), numErrFlush: stats.Counter("dest=" + cleanAddr + ".unit=Err.type=flush"), numOut: stats.Counter("dest=" + cleanAddr + ".unit=Metric.direction=out"), @@ -96,21 +107,17 @@ func NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile string, sp manuFlushSize: stats.Histogram("dest=" + cleanAddr + ".unit=B.what=FlushSize.type=manual"), numBuffered: stats.Gauge("dest=" + cleanAddr + ".unit=Metric.what=numBuffered"), } - + for i := 0; i < r.concurrency; i++ { + r.writeQueue[i] = make(chan []byte) + } r.config.Store(baseConfig{*m, make([]*dest.Destination, 0)}) - go r.run() - return r, nil -} -func (route *GrafanaNet) run() { - metrics := make([]*schema.MetricData, 0, route.flushMaxNum) - ticker := time.NewTicker(route.flushMaxWait) - client := &http.Client{ - Timeout: route.timeout, + r.client = &http.Client{ + Timeout: r.timeout, } - if !route.sslVerify { + if !r.sslVerify { // this transport should be the equivalent of Go's DefaultTransport - client.Transport = &http.Transport{ + r.client.Transport = &http.Transport{ Proxy: http.ProxyFromEnvironment, Dial: (&net.Dialer{ Timeout: 30 * time.Second, @@ -125,41 +132,99 @@ func (route *GrafanaNet) run() { } } - b := &backoff.Backoff{ - Min: 100 * time.Millisecond, - Max: time.Minute, - Factor: 1.5, - Jitter: true, + go r.run() + return r, nil +} + +func (route *GrafanaNet) run() { + metrics := make([][]*schema.MetricData, route.concurrency) + for i := 0; i < route.concurrency; i++ { + metrics[i] = make([]*schema.MetricData, 0, route.flushMaxNum) + + // start up our goroutines for writing data to tsdb-gw + route.wg.Add(1) + go route.flush(i) } - flush := func() { - if len(metrics) == 0 { + flush := func(shard int) { + if len(metrics[shard]) == 0 { return } - - mda := schema.MetricDataArray(metrics) + mda := schema.MetricDataArray(metrics[shard]) data, err := msg.CreateMsg(mda, 0, msg.FormatMetricDataArrayMsgp) if err != nil { panic(err) } + route.writeQueue[shard] <- data + route.numOut.Inc(int64(len(metrics[shard]))) + metrics[shard] = metrics[shard][:0] + + } + + hasher := fnv.New32a() + + ticker := time.NewTicker(route.flushMaxWait) + for { + select { + case buf := <-route.buf: + route.numBuffered.Dec(1) + md := parseMetric(buf, route.schemas) + if md == nil { + continue + } + md.SetId() + buf = md.KeyBySeries(buf[:0]) + hasher.Reset() + hasher.Write(buf) + shard := int(hasher.Sum32() % uint32(route.concurrency)) + metrics[shard] = append(metrics[shard], md) + if len(metrics[shard]) == route.flushMaxNum { + flush(shard) + } + case <-ticker.C: + for shard := 0; shard < route.concurrency; shard++ { + flush(shard) + } + case <-route.shutdown: + for shard := 0; shard < route.concurrency; shard++ { + flush(shard) + close(route.writeQueue[shard]) + } + return + } + } +} +func (route *GrafanaNet) flush(shard int) { + b := &backoff.Backoff{ + Min: 100 * time.Millisecond, + Max: time.Minute, + Factor: 1.5, + Jitter: true, + } + body := new(bytes.Buffer) + for data := range route.writeQueue[shard] { for { pre := time.Now() - req, err := http.NewRequest("POST", route.addr, bytes.NewBuffer(data)) + body.Reset() + snappyBody := snappy.NewWriter(body) + snappyBody.Write(data) + snappyBody.Close() + req, err := http.NewRequest("POST", route.addr, body) if err != nil { panic(err) } req.Header.Add("Authorization", "Bearer "+route.apiKey) - req.Header.Add("Content-Type", "rt-metric-binary") - resp, err := client.Do(req) + req.Header.Add("Content-Type", "rt-metric-binary-snappy") + resp, err := route.client.Do(req) diff := time.Since(pre) if err == nil && resp.StatusCode >= 200 && resp.StatusCode < 300 { b.Reset() - log.Info("GrafanaNet sent %d metrics in %s -msg size %d", len(metrics), diff, len(data)) - route.numOut.Inc(int64(len(metrics))) + log.Info("GrafanaNet sent metrics in %s -msg size %d", diff, len(data)) + route.tickFlushSize.Update(int64(len(data))) route.durationTickFlush.Update(diff) - metrics = metrics[:0] + resp.Body.Close() break } @@ -177,23 +242,7 @@ func (route *GrafanaNet) run() { time.Sleep(dur) } } - for { - select { - case buf := <-route.buf: - route.numBuffered.Dec(1) - md := parseMetric(buf, route.schemas) - if md == nil { - continue - } - md.SetId() - metrics = append(metrics, md) - if len(metrics) == route.flushMaxNum { - flush() - } - case <-ticker.C: - flush() - } - } + route.wg.Done() } func parseMetric(buf []byte, schemas persister.WhisperSchemas) *schema.MetricData { @@ -251,6 +300,12 @@ func (route *GrafanaNet) Flush() error { func (route *GrafanaNet) Shutdown() error { //conf := route.config.Load().(Config) + + // trigger all of our queues to be flushed to the tsdb-gw + route.shutdown <- struct{}{} + + // wait for all tsdb-gw writes to complete. + route.wg.Wait() return nil } diff --git a/vendor/github.com/golang/snappy/AUTHORS b/vendor/github.com/golang/snappy/AUTHORS new file mode 100644 index 00000000..bcfa1952 --- /dev/null +++ b/vendor/github.com/golang/snappy/AUTHORS @@ -0,0 +1,15 @@ +# This is the official list of Snappy-Go authors for copyright purposes. +# This file is distinct from the CONTRIBUTORS files. +# See the latter for an explanation. + +# Names should be added to this file as +# Name or Organization +# The email address is not required for organizations. + +# Please keep the list sorted. + +Damian Gryski +Google Inc. +Jan Mercl <0xjnml@gmail.com> +Rodolfo Carvalho +Sebastien Binet diff --git a/vendor/github.com/golang/snappy/CONTRIBUTORS b/vendor/github.com/golang/snappy/CONTRIBUTORS new file mode 100644 index 00000000..931ae316 --- /dev/null +++ b/vendor/github.com/golang/snappy/CONTRIBUTORS @@ -0,0 +1,37 @@ +# This is the official list of people who can contribute +# (and typically have contributed) code to the Snappy-Go repository. +# The AUTHORS file lists the copyright holders; this file +# lists people. For example, Google employees are listed here +# but not in AUTHORS, because Google holds the copyright. +# +# The submission process automatically checks to make sure +# that people submitting code are listed in this file (by email address). +# +# Names should be added to this file only after verifying that +# the individual or the individual's organization has agreed to +# the appropriate Contributor License Agreement, found here: +# +# http://code.google.com/legal/individual-cla-v1.0.html +# http://code.google.com/legal/corporate-cla-v1.0.html +# +# The agreement for individuals can be filled out on the web. +# +# When adding J Random Contributor's name to this file, +# either J's name or J's organization's name should be +# added to the AUTHORS file, depending on whether the +# individual or corporate CLA was used. + +# Names should be added to this file like so: +# Name + +# Please keep the list sorted. + +Damian Gryski +Jan Mercl <0xjnml@gmail.com> +Kai Backman +Marc-Antoine Ruel +Nigel Tao +Rob Pike +Rodolfo Carvalho +Russ Cox +Sebastien Binet diff --git a/vendor/github.com/golang/snappy/LICENSE b/vendor/github.com/golang/snappy/LICENSE new file mode 100644 index 00000000..6050c10f --- /dev/null +++ b/vendor/github.com/golang/snappy/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2011 The Snappy-Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/golang/snappy/README b/vendor/github.com/golang/snappy/README new file mode 100644 index 00000000..cea12879 --- /dev/null +++ b/vendor/github.com/golang/snappy/README @@ -0,0 +1,107 @@ +The Snappy compression format in the Go programming language. + +To download and install from source: +$ go get github.com/golang/snappy + +Unless otherwise noted, the Snappy-Go source files are distributed +under the BSD-style license found in the LICENSE file. + + + +Benchmarks. + +The golang/snappy benchmarks include compressing (Z) and decompressing (U) ten +or so files, the same set used by the C++ Snappy code (github.com/google/snappy +and note the "google", not "golang"). On an "Intel(R) Core(TM) i7-3770 CPU @ +3.40GHz", Go's GOARCH=amd64 numbers as of 2016-05-29: + +"go test -test.bench=." + +_UFlat0-8 2.19GB/s ± 0% html +_UFlat1-8 1.41GB/s ± 0% urls +_UFlat2-8 23.5GB/s ± 2% jpg +_UFlat3-8 1.91GB/s ± 0% jpg_200 +_UFlat4-8 14.0GB/s ± 1% pdf +_UFlat5-8 1.97GB/s ± 0% html4 +_UFlat6-8 814MB/s ± 0% txt1 +_UFlat7-8 785MB/s ± 0% txt2 +_UFlat8-8 857MB/s ± 0% txt3 +_UFlat9-8 719MB/s ± 1% txt4 +_UFlat10-8 2.84GB/s ± 0% pb +_UFlat11-8 1.05GB/s ± 0% gaviota + +_ZFlat0-8 1.04GB/s ± 0% html +_ZFlat1-8 534MB/s ± 0% urls +_ZFlat2-8 15.7GB/s ± 1% jpg +_ZFlat3-8 740MB/s ± 3% jpg_200 +_ZFlat4-8 9.20GB/s ± 1% pdf +_ZFlat5-8 991MB/s ± 0% html4 +_ZFlat6-8 379MB/s ± 0% txt1 +_ZFlat7-8 352MB/s ± 0% txt2 +_ZFlat8-8 396MB/s ± 1% txt3 +_ZFlat9-8 327MB/s ± 1% txt4 +_ZFlat10-8 1.33GB/s ± 1% pb +_ZFlat11-8 605MB/s ± 1% gaviota + + + +"go test -test.bench=. -tags=noasm" + +_UFlat0-8 621MB/s ± 2% html +_UFlat1-8 494MB/s ± 1% urls +_UFlat2-8 23.2GB/s ± 1% jpg +_UFlat3-8 1.12GB/s ± 1% jpg_200 +_UFlat4-8 4.35GB/s ± 1% pdf +_UFlat5-8 609MB/s ± 0% html4 +_UFlat6-8 296MB/s ± 0% txt1 +_UFlat7-8 288MB/s ± 0% txt2 +_UFlat8-8 309MB/s ± 1% txt3 +_UFlat9-8 280MB/s ± 1% txt4 +_UFlat10-8 753MB/s ± 0% pb +_UFlat11-8 400MB/s ± 0% gaviota + +_ZFlat0-8 409MB/s ± 1% html +_ZFlat1-8 250MB/s ± 1% urls +_ZFlat2-8 12.3GB/s ± 1% jpg +_ZFlat3-8 132MB/s ± 0% jpg_200 +_ZFlat4-8 2.92GB/s ± 0% pdf +_ZFlat5-8 405MB/s ± 1% html4 +_ZFlat6-8 179MB/s ± 1% txt1 +_ZFlat7-8 170MB/s ± 1% txt2 +_ZFlat8-8 189MB/s ± 1% txt3 +_ZFlat9-8 164MB/s ± 1% txt4 +_ZFlat10-8 479MB/s ± 1% pb +_ZFlat11-8 270MB/s ± 1% gaviota + + + +For comparison (Go's encoded output is byte-for-byte identical to C++'s), here +are the numbers from C++ Snappy's + +make CXXFLAGS="-O2 -DNDEBUG -g" clean snappy_unittest.log && cat snappy_unittest.log + +BM_UFlat/0 2.4GB/s html +BM_UFlat/1 1.4GB/s urls +BM_UFlat/2 21.8GB/s jpg +BM_UFlat/3 1.5GB/s jpg_200 +BM_UFlat/4 13.3GB/s pdf +BM_UFlat/5 2.1GB/s html4 +BM_UFlat/6 1.0GB/s txt1 +BM_UFlat/7 959.4MB/s txt2 +BM_UFlat/8 1.0GB/s txt3 +BM_UFlat/9 864.5MB/s txt4 +BM_UFlat/10 2.9GB/s pb +BM_UFlat/11 1.2GB/s gaviota + +BM_ZFlat/0 944.3MB/s html (22.31 %) +BM_ZFlat/1 501.6MB/s urls (47.78 %) +BM_ZFlat/2 14.3GB/s jpg (99.95 %) +BM_ZFlat/3 538.3MB/s jpg_200 (73.00 %) +BM_ZFlat/4 8.3GB/s pdf (83.30 %) +BM_ZFlat/5 903.5MB/s html4 (22.52 %) +BM_ZFlat/6 336.0MB/s txt1 (57.88 %) +BM_ZFlat/7 312.3MB/s txt2 (61.91 %) +BM_ZFlat/8 353.1MB/s txt3 (54.99 %) +BM_ZFlat/9 289.9MB/s txt4 (66.26 %) +BM_ZFlat/10 1.2GB/s pb (19.68 %) +BM_ZFlat/11 527.4MB/s gaviota (37.72 %) diff --git a/vendor/github.com/golang/snappy/decode.go b/vendor/github.com/golang/snappy/decode.go new file mode 100644 index 00000000..72efb035 --- /dev/null +++ b/vendor/github.com/golang/snappy/decode.go @@ -0,0 +1,237 @@ +// Copyright 2011 The Snappy-Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package snappy + +import ( + "encoding/binary" + "errors" + "io" +) + +var ( + // ErrCorrupt reports that the input is invalid. + ErrCorrupt = errors.New("snappy: corrupt input") + // ErrTooLarge reports that the uncompressed length is too large. + ErrTooLarge = errors.New("snappy: decoded block is too large") + // ErrUnsupported reports that the input isn't supported. + ErrUnsupported = errors.New("snappy: unsupported input") + + errUnsupportedLiteralLength = errors.New("snappy: unsupported literal length") +) + +// DecodedLen returns the length of the decoded block. +func DecodedLen(src []byte) (int, error) { + v, _, err := decodedLen(src) + return v, err +} + +// decodedLen returns the length of the decoded block and the number of bytes +// that the length header occupied. +func decodedLen(src []byte) (blockLen, headerLen int, err error) { + v, n := binary.Uvarint(src) + if n <= 0 || v > 0xffffffff { + return 0, 0, ErrCorrupt + } + + const wordSize = 32 << (^uint(0) >> 32 & 1) + if wordSize == 32 && v > 0x7fffffff { + return 0, 0, ErrTooLarge + } + return int(v), n, nil +} + +const ( + decodeErrCodeCorrupt = 1 + decodeErrCodeUnsupportedLiteralLength = 2 +) + +// Decode returns the decoded form of src. The returned slice may be a sub- +// slice of dst if dst was large enough to hold the entire decoded block. +// Otherwise, a newly allocated slice will be returned. +// +// The dst and src must not overlap. It is valid to pass a nil dst. +func Decode(dst, src []byte) ([]byte, error) { + dLen, s, err := decodedLen(src) + if err != nil { + return nil, err + } + if dLen <= len(dst) { + dst = dst[:dLen] + } else { + dst = make([]byte, dLen) + } + switch decode(dst, src[s:]) { + case 0: + return dst, nil + case decodeErrCodeUnsupportedLiteralLength: + return nil, errUnsupportedLiteralLength + } + return nil, ErrCorrupt +} + +// NewReader returns a new Reader that decompresses from r, using the framing +// format described at +// https://github.com/google/snappy/blob/master/framing_format.txt +func NewReader(r io.Reader) *Reader { + return &Reader{ + r: r, + decoded: make([]byte, maxBlockSize), + buf: make([]byte, maxEncodedLenOfMaxBlockSize+checksumSize), + } +} + +// Reader is an io.Reader that can read Snappy-compressed bytes. +type Reader struct { + r io.Reader + err error + decoded []byte + buf []byte + // decoded[i:j] contains decoded bytes that have not yet been passed on. + i, j int + readHeader bool +} + +// Reset discards any buffered data, resets all state, and switches the Snappy +// reader to read from r. This permits reusing a Reader rather than allocating +// a new one. +func (r *Reader) Reset(reader io.Reader) { + r.r = reader + r.err = nil + r.i = 0 + r.j = 0 + r.readHeader = false +} + +func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) { + if _, r.err = io.ReadFull(r.r, p); r.err != nil { + if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) { + r.err = ErrCorrupt + } + return false + } + return true +} + +// Read satisfies the io.Reader interface. +func (r *Reader) Read(p []byte) (int, error) { + if r.err != nil { + return 0, r.err + } + for { + if r.i < r.j { + n := copy(p, r.decoded[r.i:r.j]) + r.i += n + return n, nil + } + if !r.readFull(r.buf[:4], true) { + return 0, r.err + } + chunkType := r.buf[0] + if !r.readHeader { + if chunkType != chunkTypeStreamIdentifier { + r.err = ErrCorrupt + return 0, r.err + } + r.readHeader = true + } + chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16 + if chunkLen > len(r.buf) { + r.err = ErrUnsupported + return 0, r.err + } + + // The chunk types are specified at + // https://github.com/google/snappy/blob/master/framing_format.txt + switch chunkType { + case chunkTypeCompressedData: + // Section 4.2. Compressed data (chunk type 0x00). + if chunkLen < checksumSize { + r.err = ErrCorrupt + return 0, r.err + } + buf := r.buf[:chunkLen] + if !r.readFull(buf, false) { + return 0, r.err + } + checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24 + buf = buf[checksumSize:] + + n, err := DecodedLen(buf) + if err != nil { + r.err = err + return 0, r.err + } + if n > len(r.decoded) { + r.err = ErrCorrupt + return 0, r.err + } + if _, err := Decode(r.decoded, buf); err != nil { + r.err = err + return 0, r.err + } + if crc(r.decoded[:n]) != checksum { + r.err = ErrCorrupt + return 0, r.err + } + r.i, r.j = 0, n + continue + + case chunkTypeUncompressedData: + // Section 4.3. Uncompressed data (chunk type 0x01). + if chunkLen < checksumSize { + r.err = ErrCorrupt + return 0, r.err + } + buf := r.buf[:checksumSize] + if !r.readFull(buf, false) { + return 0, r.err + } + checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24 + // Read directly into r.decoded instead of via r.buf. + n := chunkLen - checksumSize + if n > len(r.decoded) { + r.err = ErrCorrupt + return 0, r.err + } + if !r.readFull(r.decoded[:n], false) { + return 0, r.err + } + if crc(r.decoded[:n]) != checksum { + r.err = ErrCorrupt + return 0, r.err + } + r.i, r.j = 0, n + continue + + case chunkTypeStreamIdentifier: + // Section 4.1. Stream identifier (chunk type 0xff). + if chunkLen != len(magicBody) { + r.err = ErrCorrupt + return 0, r.err + } + if !r.readFull(r.buf[:len(magicBody)], false) { + return 0, r.err + } + for i := 0; i < len(magicBody); i++ { + if r.buf[i] != magicBody[i] { + r.err = ErrCorrupt + return 0, r.err + } + } + continue + } + + if chunkType <= 0x7f { + // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f). + r.err = ErrUnsupported + return 0, r.err + } + // Section 4.4 Padding (chunk type 0xfe). + // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd). + if !r.readFull(r.buf[:chunkLen], false) { + return 0, r.err + } + } +} diff --git a/vendor/github.com/golang/snappy/decode_amd64.go b/vendor/github.com/golang/snappy/decode_amd64.go new file mode 100644 index 00000000..fcd192b8 --- /dev/null +++ b/vendor/github.com/golang/snappy/decode_amd64.go @@ -0,0 +1,14 @@ +// Copyright 2016 The Snappy-Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !appengine +// +build gc +// +build !noasm + +package snappy + +// decode has the same semantics as in decode_other.go. +// +//go:noescape +func decode(dst, src []byte) int diff --git a/vendor/github.com/golang/snappy/decode_amd64.s b/vendor/github.com/golang/snappy/decode_amd64.s new file mode 100644 index 00000000..e6179f65 --- /dev/null +++ b/vendor/github.com/golang/snappy/decode_amd64.s @@ -0,0 +1,490 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !appengine +// +build gc +// +build !noasm + +#include "textflag.h" + +// The asm code generally follows the pure Go code in decode_other.go, except +// where marked with a "!!!". + +// func decode(dst, src []byte) int +// +// All local variables fit into registers. The non-zero stack size is only to +// spill registers and push args when issuing a CALL. The register allocation: +// - AX scratch +// - BX scratch +// - CX length or x +// - DX offset +// - SI &src[s] +// - DI &dst[d] +// + R8 dst_base +// + R9 dst_len +// + R10 dst_base + dst_len +// + R11 src_base +// + R12 src_len +// + R13 src_base + src_len +// - R14 used by doCopy +// - R15 used by doCopy +// +// The registers R8-R13 (marked with a "+") are set at the start of the +// function, and after a CALL returns, and are not otherwise modified. +// +// The d variable is implicitly DI - R8, and len(dst)-d is R10 - DI. +// The s variable is implicitly SI - R11, and len(src)-s is R13 - SI. +TEXT ·decode(SB), NOSPLIT, $48-56 + // Initialize SI, DI and R8-R13. + MOVQ dst_base+0(FP), R8 + MOVQ dst_len+8(FP), R9 + MOVQ R8, DI + MOVQ R8, R10 + ADDQ R9, R10 + MOVQ src_base+24(FP), R11 + MOVQ src_len+32(FP), R12 + MOVQ R11, SI + MOVQ R11, R13 + ADDQ R12, R13 + +loop: + // for s < len(src) + CMPQ SI, R13 + JEQ end + + // CX = uint32(src[s]) + // + // switch src[s] & 0x03 + MOVBLZX (SI), CX + MOVL CX, BX + ANDL $3, BX + CMPL BX, $1 + JAE tagCopy + + // ---------------------------------------- + // The code below handles literal tags. + + // case tagLiteral: + // x := uint32(src[s] >> 2) + // switch + SHRL $2, CX + CMPL CX, $60 + JAE tagLit60Plus + + // case x < 60: + // s++ + INCQ SI + +doLit: + // This is the end of the inner "switch", when we have a literal tag. + // + // We assume that CX == x and x fits in a uint32, where x is the variable + // used in the pure Go decode_other.go code. + + // length = int(x) + 1 + // + // Unlike the pure Go code, we don't need to check if length <= 0 because + // CX can hold 64 bits, so the increment cannot overflow. + INCQ CX + + // Prepare to check if copying length bytes will run past the end of dst or + // src. + // + // AX = len(dst) - d + // BX = len(src) - s + MOVQ R10, AX + SUBQ DI, AX + MOVQ R13, BX + SUBQ SI, BX + + // !!! Try a faster technique for short (16 or fewer bytes) copies. + // + // if length > 16 || len(dst)-d < 16 || len(src)-s < 16 { + // goto callMemmove // Fall back on calling runtime·memmove. + // } + // + // The C++ snappy code calls this TryFastAppend. It also checks len(src)-s + // against 21 instead of 16, because it cannot assume that all of its input + // is contiguous in memory and so it needs to leave enough source bytes to + // read the next tag without refilling buffers, but Go's Decode assumes + // contiguousness (the src argument is a []byte). + CMPQ CX, $16 + JGT callMemmove + CMPQ AX, $16 + JLT callMemmove + CMPQ BX, $16 + JLT callMemmove + + // !!! Implement the copy from src to dst as a 16-byte load and store. + // (Decode's documentation says that dst and src must not overlap.) + // + // This always copies 16 bytes, instead of only length bytes, but that's + // OK. If the input is a valid Snappy encoding then subsequent iterations + // will fix up the overrun. Otherwise, Decode returns a nil []byte (and a + // non-nil error), so the overrun will be ignored. + // + // Note that on amd64, it is legal and cheap to issue unaligned 8-byte or + // 16-byte loads and stores. This technique probably wouldn't be as + // effective on architectures that are fussier about alignment. + MOVOU 0(SI), X0 + MOVOU X0, 0(DI) + + // d += length + // s += length + ADDQ CX, DI + ADDQ CX, SI + JMP loop + +callMemmove: + // if length > len(dst)-d || length > len(src)-s { etc } + CMPQ CX, AX + JGT errCorrupt + CMPQ CX, BX + JGT errCorrupt + + // copy(dst[d:], src[s:s+length]) + // + // This means calling runtime·memmove(&dst[d], &src[s], length), so we push + // DI, SI and CX as arguments. Coincidentally, we also need to spill those + // three registers to the stack, to save local variables across the CALL. + MOVQ DI, 0(SP) + MOVQ SI, 8(SP) + MOVQ CX, 16(SP) + MOVQ DI, 24(SP) + MOVQ SI, 32(SP) + MOVQ CX, 40(SP) + CALL runtime·memmove(SB) + + // Restore local variables: unspill registers from the stack and + // re-calculate R8-R13. + MOVQ 24(SP), DI + MOVQ 32(SP), SI + MOVQ 40(SP), CX + MOVQ dst_base+0(FP), R8 + MOVQ dst_len+8(FP), R9 + MOVQ R8, R10 + ADDQ R9, R10 + MOVQ src_base+24(FP), R11 + MOVQ src_len+32(FP), R12 + MOVQ R11, R13 + ADDQ R12, R13 + + // d += length + // s += length + ADDQ CX, DI + ADDQ CX, SI + JMP loop + +tagLit60Plus: + // !!! This fragment does the + // + // s += x - 58; if uint(s) > uint(len(src)) { etc } + // + // checks. In the asm version, we code it once instead of once per switch case. + ADDQ CX, SI + SUBQ $58, SI + MOVQ SI, BX + SUBQ R11, BX + CMPQ BX, R12 + JA errCorrupt + + // case x == 60: + CMPL CX, $61 + JEQ tagLit61 + JA tagLit62Plus + + // x = uint32(src[s-1]) + MOVBLZX -1(SI), CX + JMP doLit + +tagLit61: + // case x == 61: + // x = uint32(src[s-2]) | uint32(src[s-1])<<8 + MOVWLZX -2(SI), CX + JMP doLit + +tagLit62Plus: + CMPL CX, $62 + JA tagLit63 + + // case x == 62: + // x = uint32(src[s-3]) | uint32(src[s-2])<<8 | uint32(src[s-1])<<16 + MOVWLZX -3(SI), CX + MOVBLZX -1(SI), BX + SHLL $16, BX + ORL BX, CX + JMP doLit + +tagLit63: + // case x == 63: + // x = uint32(src[s-4]) | uint32(src[s-3])<<8 | uint32(src[s-2])<<16 | uint32(src[s-1])<<24 + MOVL -4(SI), CX + JMP doLit + +// The code above handles literal tags. +// ---------------------------------------- +// The code below handles copy tags. + +tagCopy4: + // case tagCopy4: + // s += 5 + ADDQ $5, SI + + // if uint(s) > uint(len(src)) { etc } + MOVQ SI, BX + SUBQ R11, BX + CMPQ BX, R12 + JA errCorrupt + + // length = 1 + int(src[s-5])>>2 + SHRQ $2, CX + INCQ CX + + // offset = int(uint32(src[s-4]) | uint32(src[s-3])<<8 | uint32(src[s-2])<<16 | uint32(src[s-1])<<24) + MOVLQZX -4(SI), DX + JMP doCopy + +tagCopy2: + // case tagCopy2: + // s += 3 + ADDQ $3, SI + + // if uint(s) > uint(len(src)) { etc } + MOVQ SI, BX + SUBQ R11, BX + CMPQ BX, R12 + JA errCorrupt + + // length = 1 + int(src[s-3])>>2 + SHRQ $2, CX + INCQ CX + + // offset = int(uint32(src[s-2]) | uint32(src[s-1])<<8) + MOVWQZX -2(SI), DX + JMP doCopy + +tagCopy: + // We have a copy tag. We assume that: + // - BX == src[s] & 0x03 + // - CX == src[s] + CMPQ BX, $2 + JEQ tagCopy2 + JA tagCopy4 + + // case tagCopy1: + // s += 2 + ADDQ $2, SI + + // if uint(s) > uint(len(src)) { etc } + MOVQ SI, BX + SUBQ R11, BX + CMPQ BX, R12 + JA errCorrupt + + // offset = int(uint32(src[s-2])&0xe0<<3 | uint32(src[s-1])) + MOVQ CX, DX + ANDQ $0xe0, DX + SHLQ $3, DX + MOVBQZX -1(SI), BX + ORQ BX, DX + + // length = 4 + int(src[s-2])>>2&0x7 + SHRQ $2, CX + ANDQ $7, CX + ADDQ $4, CX + +doCopy: + // This is the end of the outer "switch", when we have a copy tag. + // + // We assume that: + // - CX == length && CX > 0 + // - DX == offset + + // if offset <= 0 { etc } + CMPQ DX, $0 + JLE errCorrupt + + // if d < offset { etc } + MOVQ DI, BX + SUBQ R8, BX + CMPQ BX, DX + JLT errCorrupt + + // if length > len(dst)-d { etc } + MOVQ R10, BX + SUBQ DI, BX + CMPQ CX, BX + JGT errCorrupt + + // forwardCopy(dst[d:d+length], dst[d-offset:]); d += length + // + // Set: + // - R14 = len(dst)-d + // - R15 = &dst[d-offset] + MOVQ R10, R14 + SUBQ DI, R14 + MOVQ DI, R15 + SUBQ DX, R15 + + // !!! Try a faster technique for short (16 or fewer bytes) forward copies. + // + // First, try using two 8-byte load/stores, similar to the doLit technique + // above. Even if dst[d:d+length] and dst[d-offset:] can overlap, this is + // still OK if offset >= 8. Note that this has to be two 8-byte load/stores + // and not one 16-byte load/store, and the first store has to be before the + // second load, due to the overlap if offset is in the range [8, 16). + // + // if length > 16 || offset < 8 || len(dst)-d < 16 { + // goto slowForwardCopy + // } + // copy 16 bytes + // d += length + CMPQ CX, $16 + JGT slowForwardCopy + CMPQ DX, $8 + JLT slowForwardCopy + CMPQ R14, $16 + JLT slowForwardCopy + MOVQ 0(R15), AX + MOVQ AX, 0(DI) + MOVQ 8(R15), BX + MOVQ BX, 8(DI) + ADDQ CX, DI + JMP loop + +slowForwardCopy: + // !!! If the forward copy is longer than 16 bytes, or if offset < 8, we + // can still try 8-byte load stores, provided we can overrun up to 10 extra + // bytes. As above, the overrun will be fixed up by subsequent iterations + // of the outermost loop. + // + // The C++ snappy code calls this technique IncrementalCopyFastPath. Its + // commentary says: + // + // ---- + // + // The main part of this loop is a simple copy of eight bytes at a time + // until we've copied (at least) the requested amount of bytes. However, + // if d and d-offset are less than eight bytes apart (indicating a + // repeating pattern of length < 8), we first need to expand the pattern in + // order to get the correct results. For instance, if the buffer looks like + // this, with the eight-byte and patterns marked as + // intervals: + // + // abxxxxxxxxxxxx + // [------] d-offset + // [------] d + // + // a single eight-byte copy from to will repeat the pattern + // once, after which we can move two bytes without moving : + // + // ababxxxxxxxxxx + // [------] d-offset + // [------] d + // + // and repeat the exercise until the two no longer overlap. + // + // This allows us to do very well in the special case of one single byte + // repeated many times, without taking a big hit for more general cases. + // + // The worst case of extra writing past the end of the match occurs when + // offset == 1 and length == 1; the last copy will read from byte positions + // [0..7] and write to [4..11], whereas it was only supposed to write to + // position 1. Thus, ten excess bytes. + // + // ---- + // + // That "10 byte overrun" worst case is confirmed by Go's + // TestSlowForwardCopyOverrun, which also tests the fixUpSlowForwardCopy + // and finishSlowForwardCopy algorithm. + // + // if length > len(dst)-d-10 { + // goto verySlowForwardCopy + // } + SUBQ $10, R14 + CMPQ CX, R14 + JGT verySlowForwardCopy + +makeOffsetAtLeast8: + // !!! As above, expand the pattern so that offset >= 8 and we can use + // 8-byte load/stores. + // + // for offset < 8 { + // copy 8 bytes from dst[d-offset:] to dst[d:] + // length -= offset + // d += offset + // offset += offset + // // The two previous lines together means that d-offset, and therefore + // // R15, is unchanged. + // } + CMPQ DX, $8 + JGE fixUpSlowForwardCopy + MOVQ (R15), BX + MOVQ BX, (DI) + SUBQ DX, CX + ADDQ DX, DI + ADDQ DX, DX + JMP makeOffsetAtLeast8 + +fixUpSlowForwardCopy: + // !!! Add length (which might be negative now) to d (implied by DI being + // &dst[d]) so that d ends up at the right place when we jump back to the + // top of the loop. Before we do that, though, we save DI to AX so that, if + // length is positive, copying the remaining length bytes will write to the + // right place. + MOVQ DI, AX + ADDQ CX, DI + +finishSlowForwardCopy: + // !!! Repeat 8-byte load/stores until length <= 0. Ending with a negative + // length means that we overrun, but as above, that will be fixed up by + // subsequent iterations of the outermost loop. + CMPQ CX, $0 + JLE loop + MOVQ (R15), BX + MOVQ BX, (AX) + ADDQ $8, R15 + ADDQ $8, AX + SUBQ $8, CX + JMP finishSlowForwardCopy + +verySlowForwardCopy: + // verySlowForwardCopy is a simple implementation of forward copy. In C + // parlance, this is a do/while loop instead of a while loop, since we know + // that length > 0. In Go syntax: + // + // for { + // dst[d] = dst[d - offset] + // d++ + // length-- + // if length == 0 { + // break + // } + // } + MOVB (R15), BX + MOVB BX, (DI) + INCQ R15 + INCQ DI + DECQ CX + JNZ verySlowForwardCopy + JMP loop + +// The code above handles copy tags. +// ---------------------------------------- + +end: + // This is the end of the "for s < len(src)". + // + // if d != len(dst) { etc } + CMPQ DI, R10 + JNE errCorrupt + + // return 0 + MOVQ $0, ret+48(FP) + RET + +errCorrupt: + // return decodeErrCodeCorrupt + MOVQ $1, ret+48(FP) + RET diff --git a/vendor/github.com/golang/snappy/decode_other.go b/vendor/github.com/golang/snappy/decode_other.go new file mode 100644 index 00000000..8c9f2049 --- /dev/null +++ b/vendor/github.com/golang/snappy/decode_other.go @@ -0,0 +1,101 @@ +// Copyright 2016 The Snappy-Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !amd64 appengine !gc noasm + +package snappy + +// decode writes the decoding of src to dst. It assumes that the varint-encoded +// length of the decompressed bytes has already been read, and that len(dst) +// equals that length. +// +// It returns 0 on success or a decodeErrCodeXxx error code on failure. +func decode(dst, src []byte) int { + var d, s, offset, length int + for s < len(src) { + switch src[s] & 0x03 { + case tagLiteral: + x := uint32(src[s] >> 2) + switch { + case x < 60: + s++ + case x == 60: + s += 2 + if uint(s) > uint(len(src)) { // The uint conversions catch overflow from the previous line. + return decodeErrCodeCorrupt + } + x = uint32(src[s-1]) + case x == 61: + s += 3 + if uint(s) > uint(len(src)) { // The uint conversions catch overflow from the previous line. + return decodeErrCodeCorrupt + } + x = uint32(src[s-2]) | uint32(src[s-1])<<8 + case x == 62: + s += 4 + if uint(s) > uint(len(src)) { // The uint conversions catch overflow from the previous line. + return decodeErrCodeCorrupt + } + x = uint32(src[s-3]) | uint32(src[s-2])<<8 | uint32(src[s-1])<<16 + case x == 63: + s += 5 + if uint(s) > uint(len(src)) { // The uint conversions catch overflow from the previous line. + return decodeErrCodeCorrupt + } + x = uint32(src[s-4]) | uint32(src[s-3])<<8 | uint32(src[s-2])<<16 | uint32(src[s-1])<<24 + } + length = int(x) + 1 + if length <= 0 { + return decodeErrCodeUnsupportedLiteralLength + } + if length > len(dst)-d || length > len(src)-s { + return decodeErrCodeCorrupt + } + copy(dst[d:], src[s:s+length]) + d += length + s += length + continue + + case tagCopy1: + s += 2 + if uint(s) > uint(len(src)) { // The uint conversions catch overflow from the previous line. + return decodeErrCodeCorrupt + } + length = 4 + int(src[s-2])>>2&0x7 + offset = int(uint32(src[s-2])&0xe0<<3 | uint32(src[s-1])) + + case tagCopy2: + s += 3 + if uint(s) > uint(len(src)) { // The uint conversions catch overflow from the previous line. + return decodeErrCodeCorrupt + } + length = 1 + int(src[s-3])>>2 + offset = int(uint32(src[s-2]) | uint32(src[s-1])<<8) + + case tagCopy4: + s += 5 + if uint(s) > uint(len(src)) { // The uint conversions catch overflow from the previous line. + return decodeErrCodeCorrupt + } + length = 1 + int(src[s-5])>>2 + offset = int(uint32(src[s-4]) | uint32(src[s-3])<<8 | uint32(src[s-2])<<16 | uint32(src[s-1])<<24) + } + + if offset <= 0 || d < offset || length > len(dst)-d { + return decodeErrCodeCorrupt + } + // Copy from an earlier sub-slice of dst to a later sub-slice. Unlike + // the built-in copy function, this byte-by-byte copy always runs + // forwards, even if the slices overlap. Conceptually, this is: + // + // d += forwardCopy(dst[d:d+length], dst[d-offset:]) + for end := d + length; d != end; d++ { + dst[d] = dst[d-offset] + } + } + if d != len(dst) { + return decodeErrCodeCorrupt + } + return 0 +} diff --git a/vendor/github.com/golang/snappy/encode.go b/vendor/github.com/golang/snappy/encode.go new file mode 100644 index 00000000..87496890 --- /dev/null +++ b/vendor/github.com/golang/snappy/encode.go @@ -0,0 +1,285 @@ +// Copyright 2011 The Snappy-Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package snappy + +import ( + "encoding/binary" + "errors" + "io" +) + +// Encode returns the encoded form of src. The returned slice may be a sub- +// slice of dst if dst was large enough to hold the entire encoded block. +// Otherwise, a newly allocated slice will be returned. +// +// The dst and src must not overlap. It is valid to pass a nil dst. +func Encode(dst, src []byte) []byte { + if n := MaxEncodedLen(len(src)); n < 0 { + panic(ErrTooLarge) + } else if len(dst) < n { + dst = make([]byte, n) + } + + // The block starts with the varint-encoded length of the decompressed bytes. + d := binary.PutUvarint(dst, uint64(len(src))) + + for len(src) > 0 { + p := src + src = nil + if len(p) > maxBlockSize { + p, src = p[:maxBlockSize], p[maxBlockSize:] + } + if len(p) < minNonLiteralBlockSize { + d += emitLiteral(dst[d:], p) + } else { + d += encodeBlock(dst[d:], p) + } + } + return dst[:d] +} + +// inputMargin is the minimum number of extra input bytes to keep, inside +// encodeBlock's inner loop. On some architectures, this margin lets us +// implement a fast path for emitLiteral, where the copy of short (<= 16 byte) +// literals can be implemented as a single load to and store from a 16-byte +// register. That literal's actual length can be as short as 1 byte, so this +// can copy up to 15 bytes too much, but that's OK as subsequent iterations of +// the encoding loop will fix up the copy overrun, and this inputMargin ensures +// that we don't overrun the dst and src buffers. +const inputMargin = 16 - 1 + +// minNonLiteralBlockSize is the minimum size of the input to encodeBlock that +// could be encoded with a copy tag. This is the minimum with respect to the +// algorithm used by encodeBlock, not a minimum enforced by the file format. +// +// The encoded output must start with at least a 1 byte literal, as there are +// no previous bytes to copy. A minimal (1 byte) copy after that, generated +// from an emitCopy call in encodeBlock's main loop, would require at least +// another inputMargin bytes, for the reason above: we want any emitLiteral +// calls inside encodeBlock's main loop to use the fast path if possible, which +// requires being able to overrun by inputMargin bytes. Thus, +// minNonLiteralBlockSize equals 1 + 1 + inputMargin. +// +// The C++ code doesn't use this exact threshold, but it could, as discussed at +// https://groups.google.com/d/topic/snappy-compression/oGbhsdIJSJ8/discussion +// The difference between Go (2+inputMargin) and C++ (inputMargin) is purely an +// optimization. It should not affect the encoded form. This is tested by +// TestSameEncodingAsCppShortCopies. +const minNonLiteralBlockSize = 1 + 1 + inputMargin + +// MaxEncodedLen returns the maximum length of a snappy block, given its +// uncompressed length. +// +// It will return a negative value if srcLen is too large to encode. +func MaxEncodedLen(srcLen int) int { + n := uint64(srcLen) + if n > 0xffffffff { + return -1 + } + // Compressed data can be defined as: + // compressed := item* literal* + // item := literal* copy + // + // The trailing literal sequence has a space blowup of at most 62/60 + // since a literal of length 60 needs one tag byte + one extra byte + // for length information. + // + // Item blowup is trickier to measure. Suppose the "copy" op copies + // 4 bytes of data. Because of a special check in the encoding code, + // we produce a 4-byte copy only if the offset is < 65536. Therefore + // the copy op takes 3 bytes to encode, and this type of item leads + // to at most the 62/60 blowup for representing literals. + // + // Suppose the "copy" op copies 5 bytes of data. If the offset is big + // enough, it will take 5 bytes to encode the copy op. Therefore the + // worst case here is a one-byte literal followed by a five-byte copy. + // That is, 6 bytes of input turn into 7 bytes of "compressed" data. + // + // This last factor dominates the blowup, so the final estimate is: + n = 32 + n + n/6 + if n > 0xffffffff { + return -1 + } + return int(n) +} + +var errClosed = errors.New("snappy: Writer is closed") + +// NewWriter returns a new Writer that compresses to w. +// +// The Writer returned does not buffer writes. There is no need to Flush or +// Close such a Writer. +// +// Deprecated: the Writer returned is not suitable for many small writes, only +// for few large writes. Use NewBufferedWriter instead, which is efficient +// regardless of the frequency and shape of the writes, and remember to Close +// that Writer when done. +func NewWriter(w io.Writer) *Writer { + return &Writer{ + w: w, + obuf: make([]byte, obufLen), + } +} + +// NewBufferedWriter returns a new Writer that compresses to w, using the +// framing format described at +// https://github.com/google/snappy/blob/master/framing_format.txt +// +// The Writer returned buffers writes. Users must call Close to guarantee all +// data has been forwarded to the underlying io.Writer. They may also call +// Flush zero or more times before calling Close. +func NewBufferedWriter(w io.Writer) *Writer { + return &Writer{ + w: w, + ibuf: make([]byte, 0, maxBlockSize), + obuf: make([]byte, obufLen), + } +} + +// Writer is an io.Writer than can write Snappy-compressed bytes. +type Writer struct { + w io.Writer + err error + + // ibuf is a buffer for the incoming (uncompressed) bytes. + // + // Its use is optional. For backwards compatibility, Writers created by the + // NewWriter function have ibuf == nil, do not buffer incoming bytes, and + // therefore do not need to be Flush'ed or Close'd. + ibuf []byte + + // obuf is a buffer for the outgoing (compressed) bytes. + obuf []byte + + // wroteStreamHeader is whether we have written the stream header. + wroteStreamHeader bool +} + +// Reset discards the writer's state and switches the Snappy writer to write to +// w. This permits reusing a Writer rather than allocating a new one. +func (w *Writer) Reset(writer io.Writer) { + w.w = writer + w.err = nil + if w.ibuf != nil { + w.ibuf = w.ibuf[:0] + } + w.wroteStreamHeader = false +} + +// Write satisfies the io.Writer interface. +func (w *Writer) Write(p []byte) (nRet int, errRet error) { + if w.ibuf == nil { + // Do not buffer incoming bytes. This does not perform or compress well + // if the caller of Writer.Write writes many small slices. This + // behavior is therefore deprecated, but still supported for backwards + // compatibility with code that doesn't explicitly Flush or Close. + return w.write(p) + } + + // The remainder of this method is based on bufio.Writer.Write from the + // standard library. + + for len(p) > (cap(w.ibuf)-len(w.ibuf)) && w.err == nil { + var n int + if len(w.ibuf) == 0 { + // Large write, empty buffer. + // Write directly from p to avoid copy. + n, _ = w.write(p) + } else { + n = copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p) + w.ibuf = w.ibuf[:len(w.ibuf)+n] + w.Flush() + } + nRet += n + p = p[n:] + } + if w.err != nil { + return nRet, w.err + } + n := copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p) + w.ibuf = w.ibuf[:len(w.ibuf)+n] + nRet += n + return nRet, nil +} + +func (w *Writer) write(p []byte) (nRet int, errRet error) { + if w.err != nil { + return 0, w.err + } + for len(p) > 0 { + obufStart := len(magicChunk) + if !w.wroteStreamHeader { + w.wroteStreamHeader = true + copy(w.obuf, magicChunk) + obufStart = 0 + } + + var uncompressed []byte + if len(p) > maxBlockSize { + uncompressed, p = p[:maxBlockSize], p[maxBlockSize:] + } else { + uncompressed, p = p, nil + } + checksum := crc(uncompressed) + + // Compress the buffer, discarding the result if the improvement + // isn't at least 12.5%. + compressed := Encode(w.obuf[obufHeaderLen:], uncompressed) + chunkType := uint8(chunkTypeCompressedData) + chunkLen := 4 + len(compressed) + obufEnd := obufHeaderLen + len(compressed) + if len(compressed) >= len(uncompressed)-len(uncompressed)/8 { + chunkType = chunkTypeUncompressedData + chunkLen = 4 + len(uncompressed) + obufEnd = obufHeaderLen + } + + // Fill in the per-chunk header that comes before the body. + w.obuf[len(magicChunk)+0] = chunkType + w.obuf[len(magicChunk)+1] = uint8(chunkLen >> 0) + w.obuf[len(magicChunk)+2] = uint8(chunkLen >> 8) + w.obuf[len(magicChunk)+3] = uint8(chunkLen >> 16) + w.obuf[len(magicChunk)+4] = uint8(checksum >> 0) + w.obuf[len(magicChunk)+5] = uint8(checksum >> 8) + w.obuf[len(magicChunk)+6] = uint8(checksum >> 16) + w.obuf[len(magicChunk)+7] = uint8(checksum >> 24) + + if _, err := w.w.Write(w.obuf[obufStart:obufEnd]); err != nil { + w.err = err + return nRet, err + } + if chunkType == chunkTypeUncompressedData { + if _, err := w.w.Write(uncompressed); err != nil { + w.err = err + return nRet, err + } + } + nRet += len(uncompressed) + } + return nRet, nil +} + +// Flush flushes the Writer to its underlying io.Writer. +func (w *Writer) Flush() error { + if w.err != nil { + return w.err + } + if len(w.ibuf) == 0 { + return nil + } + w.write(w.ibuf) + w.ibuf = w.ibuf[:0] + return w.err +} + +// Close calls Flush and then closes the Writer. +func (w *Writer) Close() error { + w.Flush() + ret := w.err + if w.err == nil { + w.err = errClosed + } + return ret +} diff --git a/vendor/github.com/golang/snappy/encode_amd64.go b/vendor/github.com/golang/snappy/encode_amd64.go new file mode 100644 index 00000000..2a56fb50 --- /dev/null +++ b/vendor/github.com/golang/snappy/encode_amd64.go @@ -0,0 +1,29 @@ +// Copyright 2016 The Snappy-Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !appengine +// +build gc +// +build !noasm + +package snappy + +// emitLiteral has the same semantics as in encode_other.go. +// +//go:noescape +func emitLiteral(dst, lit []byte) int + +// emitCopy has the same semantics as in encode_other.go. +// +//go:noescape +func emitCopy(dst []byte, offset, length int) int + +// extendMatch has the same semantics as in encode_other.go. +// +//go:noescape +func extendMatch(src []byte, i, j int) int + +// encodeBlock has the same semantics as in encode_other.go. +// +//go:noescape +func encodeBlock(dst, src []byte) (d int) \ No newline at end of file diff --git a/vendor/github.com/golang/snappy/encode_amd64.s b/vendor/github.com/golang/snappy/encode_amd64.s new file mode 100644 index 00000000..adfd979f --- /dev/null +++ b/vendor/github.com/golang/snappy/encode_amd64.s @@ -0,0 +1,730 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !appengine +// +build gc +// +build !noasm + +#include "textflag.h" + +// The XXX lines assemble on Go 1.4, 1.5 and 1.7, but not 1.6, due to a +// Go toolchain regression. See https://github.com/golang/go/issues/15426 and +// https://github.com/golang/snappy/issues/29 +// +// As a workaround, the package was built with a known good assembler, and +// those instructions were disassembled by "objdump -d" to yield the +// 4e 0f b7 7c 5c 78 movzwq 0x78(%rsp,%r11,2),%r15 +// style comments, in AT&T asm syntax. Note that rsp here is a physical +// register, not Go/asm's SP pseudo-register (see https://golang.org/doc/asm). +// The instructions were then encoded as "BYTE $0x.." sequences, which assemble +// fine on Go 1.6. + +// The asm code generally follows the pure Go code in encode_other.go, except +// where marked with a "!!!". + +// ---------------------------------------------------------------------------- + +// func emitLiteral(dst, lit []byte) int +// +// All local variables fit into registers. The register allocation: +// - AX len(lit) +// - BX n +// - DX return value +// - DI &dst[i] +// - R10 &lit[0] +// +// The 24 bytes of stack space is to call runtime·memmove. +// +// The unusual register allocation of local variables, such as R10 for the +// source pointer, matches the allocation used at the call site in encodeBlock, +// which makes it easier to manually inline this function. +TEXT ·emitLiteral(SB), NOSPLIT, $24-56 + MOVQ dst_base+0(FP), DI + MOVQ lit_base+24(FP), R10 + MOVQ lit_len+32(FP), AX + MOVQ AX, DX + MOVL AX, BX + SUBL $1, BX + + CMPL BX, $60 + JLT oneByte + CMPL BX, $256 + JLT twoBytes + +threeBytes: + MOVB $0xf4, 0(DI) + MOVW BX, 1(DI) + ADDQ $3, DI + ADDQ $3, DX + JMP memmove + +twoBytes: + MOVB $0xf0, 0(DI) + MOVB BX, 1(DI) + ADDQ $2, DI + ADDQ $2, DX + JMP memmove + +oneByte: + SHLB $2, BX + MOVB BX, 0(DI) + ADDQ $1, DI + ADDQ $1, DX + +memmove: + MOVQ DX, ret+48(FP) + + // copy(dst[i:], lit) + // + // This means calling runtime·memmove(&dst[i], &lit[0], len(lit)), so we push + // DI, R10 and AX as arguments. + MOVQ DI, 0(SP) + MOVQ R10, 8(SP) + MOVQ AX, 16(SP) + CALL runtime·memmove(SB) + RET + +// ---------------------------------------------------------------------------- + +// func emitCopy(dst []byte, offset, length int) int +// +// All local variables fit into registers. The register allocation: +// - AX length +// - SI &dst[0] +// - DI &dst[i] +// - R11 offset +// +// The unusual register allocation of local variables, such as R11 for the +// offset, matches the allocation used at the call site in encodeBlock, which +// makes it easier to manually inline this function. +TEXT ·emitCopy(SB), NOSPLIT, $0-48 + MOVQ dst_base+0(FP), DI + MOVQ DI, SI + MOVQ offset+24(FP), R11 + MOVQ length+32(FP), AX + +loop0: + // for length >= 68 { etc } + CMPL AX, $68 + JLT step1 + + // Emit a length 64 copy, encoded as 3 bytes. + MOVB $0xfe, 0(DI) + MOVW R11, 1(DI) + ADDQ $3, DI + SUBL $64, AX + JMP loop0 + +step1: + // if length > 64 { etc } + CMPL AX, $64 + JLE step2 + + // Emit a length 60 copy, encoded as 3 bytes. + MOVB $0xee, 0(DI) + MOVW R11, 1(DI) + ADDQ $3, DI + SUBL $60, AX + +step2: + // if length >= 12 || offset >= 2048 { goto step3 } + CMPL AX, $12 + JGE step3 + CMPL R11, $2048 + JGE step3 + + // Emit the remaining copy, encoded as 2 bytes. + MOVB R11, 1(DI) + SHRL $8, R11 + SHLB $5, R11 + SUBB $4, AX + SHLB $2, AX + ORB AX, R11 + ORB $1, R11 + MOVB R11, 0(DI) + ADDQ $2, DI + + // Return the number of bytes written. + SUBQ SI, DI + MOVQ DI, ret+40(FP) + RET + +step3: + // Emit the remaining copy, encoded as 3 bytes. + SUBL $1, AX + SHLB $2, AX + ORB $2, AX + MOVB AX, 0(DI) + MOVW R11, 1(DI) + ADDQ $3, DI + + // Return the number of bytes written. + SUBQ SI, DI + MOVQ DI, ret+40(FP) + RET + +// ---------------------------------------------------------------------------- + +// func extendMatch(src []byte, i, j int) int +// +// All local variables fit into registers. The register allocation: +// - DX &src[0] +// - SI &src[j] +// - R13 &src[len(src) - 8] +// - R14 &src[len(src)] +// - R15 &src[i] +// +// The unusual register allocation of local variables, such as R15 for a source +// pointer, matches the allocation used at the call site in encodeBlock, which +// makes it easier to manually inline this function. +TEXT ·extendMatch(SB), NOSPLIT, $0-48 + MOVQ src_base+0(FP), DX + MOVQ src_len+8(FP), R14 + MOVQ i+24(FP), R15 + MOVQ j+32(FP), SI + ADDQ DX, R14 + ADDQ DX, R15 + ADDQ DX, SI + MOVQ R14, R13 + SUBQ $8, R13 + +cmp8: + // As long as we are 8 or more bytes before the end of src, we can load and + // compare 8 bytes at a time. If those 8 bytes are equal, repeat. + CMPQ SI, R13 + JA cmp1 + MOVQ (R15), AX + MOVQ (SI), BX + CMPQ AX, BX + JNE bsf + ADDQ $8, R15 + ADDQ $8, SI + JMP cmp8 + +bsf: + // If those 8 bytes were not equal, XOR the two 8 byte values, and return + // the index of the first byte that differs. The BSF instruction finds the + // least significant 1 bit, the amd64 architecture is little-endian, and + // the shift by 3 converts a bit index to a byte index. + XORQ AX, BX + BSFQ BX, BX + SHRQ $3, BX + ADDQ BX, SI + + // Convert from &src[ret] to ret. + SUBQ DX, SI + MOVQ SI, ret+40(FP) + RET + +cmp1: + // In src's tail, compare 1 byte at a time. + CMPQ SI, R14 + JAE extendMatchEnd + MOVB (R15), AX + MOVB (SI), BX + CMPB AX, BX + JNE extendMatchEnd + ADDQ $1, R15 + ADDQ $1, SI + JMP cmp1 + +extendMatchEnd: + // Convert from &src[ret] to ret. + SUBQ DX, SI + MOVQ SI, ret+40(FP) + RET + +// ---------------------------------------------------------------------------- + +// func encodeBlock(dst, src []byte) (d int) +// +// All local variables fit into registers, other than "var table". The register +// allocation: +// - AX . . +// - BX . . +// - CX 56 shift (note that amd64 shifts by non-immediates must use CX). +// - DX 64 &src[0], tableSize +// - SI 72 &src[s] +// - DI 80 &dst[d] +// - R9 88 sLimit +// - R10 . &src[nextEmit] +// - R11 96 prevHash, currHash, nextHash, offset +// - R12 104 &src[base], skip +// - R13 . &src[nextS], &src[len(src) - 8] +// - R14 . len(src), bytesBetweenHashLookups, &src[len(src)], x +// - R15 112 candidate +// +// The second column (56, 64, etc) is the stack offset to spill the registers +// when calling other functions. We could pack this slightly tighter, but it's +// simpler to have a dedicated spill map independent of the function called. +// +// "var table [maxTableSize]uint16" takes up 32768 bytes of stack space. An +// extra 56 bytes, to call other functions, and an extra 64 bytes, to spill +// local variables (registers) during calls gives 32768 + 56 + 64 = 32888. +TEXT ·encodeBlock(SB), 0, $32888-56 + MOVQ dst_base+0(FP), DI + MOVQ src_base+24(FP), SI + MOVQ src_len+32(FP), R14 + + // shift, tableSize := uint32(32-8), 1<<8 + MOVQ $24, CX + MOVQ $256, DX + +calcShift: + // for ; tableSize < maxTableSize && tableSize < len(src); tableSize *= 2 { + // shift-- + // } + CMPQ DX, $16384 + JGE varTable + CMPQ DX, R14 + JGE varTable + SUBQ $1, CX + SHLQ $1, DX + JMP calcShift + +varTable: + // var table [maxTableSize]uint16 + // + // In the asm code, unlike the Go code, we can zero-initialize only the + // first tableSize elements. Each uint16 element is 2 bytes and each MOVOU + // writes 16 bytes, so we can do only tableSize/8 writes instead of the + // 2048 writes that would zero-initialize all of table's 32768 bytes. + SHRQ $3, DX + LEAQ table-32768(SP), BX + PXOR X0, X0 + +memclr: + MOVOU X0, 0(BX) + ADDQ $16, BX + SUBQ $1, DX + JNZ memclr + + // !!! DX = &src[0] + MOVQ SI, DX + + // sLimit := len(src) - inputMargin + MOVQ R14, R9 + SUBQ $15, R9 + + // !!! Pre-emptively spill CX, DX and R9 to the stack. Their values don't + // change for the rest of the function. + MOVQ CX, 56(SP) + MOVQ DX, 64(SP) + MOVQ R9, 88(SP) + + // nextEmit := 0 + MOVQ DX, R10 + + // s := 1 + ADDQ $1, SI + + // nextHash := hash(load32(src, s), shift) + MOVL 0(SI), R11 + IMULL $0x1e35a7bd, R11 + SHRL CX, R11 + +outer: + // for { etc } + + // skip := 32 + MOVQ $32, R12 + + // nextS := s + MOVQ SI, R13 + + // candidate := 0 + MOVQ $0, R15 + +inner0: + // for { etc } + + // s := nextS + MOVQ R13, SI + + // bytesBetweenHashLookups := skip >> 5 + MOVQ R12, R14 + SHRQ $5, R14 + + // nextS = s + bytesBetweenHashLookups + ADDQ R14, R13 + + // skip += bytesBetweenHashLookups + ADDQ R14, R12 + + // if nextS > sLimit { goto emitRemainder } + MOVQ R13, AX + SUBQ DX, AX + CMPQ AX, R9 + JA emitRemainder + + // candidate = int(table[nextHash]) + // XXX: MOVWQZX table-32768(SP)(R11*2), R15 + // XXX: 4e 0f b7 7c 5c 78 movzwq 0x78(%rsp,%r11,2),%r15 + BYTE $0x4e + BYTE $0x0f + BYTE $0xb7 + BYTE $0x7c + BYTE $0x5c + BYTE $0x78 + + // table[nextHash] = uint16(s) + MOVQ SI, AX + SUBQ DX, AX + + // XXX: MOVW AX, table-32768(SP)(R11*2) + // XXX: 66 42 89 44 5c 78 mov %ax,0x78(%rsp,%r11,2) + BYTE $0x66 + BYTE $0x42 + BYTE $0x89 + BYTE $0x44 + BYTE $0x5c + BYTE $0x78 + + // nextHash = hash(load32(src, nextS), shift) + MOVL 0(R13), R11 + IMULL $0x1e35a7bd, R11 + SHRL CX, R11 + + // if load32(src, s) != load32(src, candidate) { continue } break + MOVL 0(SI), AX + MOVL (DX)(R15*1), BX + CMPL AX, BX + JNE inner0 + +fourByteMatch: + // As per the encode_other.go code: + // + // A 4-byte match has been found. We'll later see etc. + + // !!! Jump to a fast path for short (<= 16 byte) literals. See the comment + // on inputMargin in encode.go. + MOVQ SI, AX + SUBQ R10, AX + CMPQ AX, $16 + JLE emitLiteralFastPath + + // ---------------------------------------- + // Begin inline of the emitLiteral call. + // + // d += emitLiteral(dst[d:], src[nextEmit:s]) + + MOVL AX, BX + SUBL $1, BX + + CMPL BX, $60 + JLT inlineEmitLiteralOneByte + CMPL BX, $256 + JLT inlineEmitLiteralTwoBytes + +inlineEmitLiteralThreeBytes: + MOVB $0xf4, 0(DI) + MOVW BX, 1(DI) + ADDQ $3, DI + JMP inlineEmitLiteralMemmove + +inlineEmitLiteralTwoBytes: + MOVB $0xf0, 0(DI) + MOVB BX, 1(DI) + ADDQ $2, DI + JMP inlineEmitLiteralMemmove + +inlineEmitLiteralOneByte: + SHLB $2, BX + MOVB BX, 0(DI) + ADDQ $1, DI + +inlineEmitLiteralMemmove: + // Spill local variables (registers) onto the stack; call; unspill. + // + // copy(dst[i:], lit) + // + // This means calling runtime·memmove(&dst[i], &lit[0], len(lit)), so we push + // DI, R10 and AX as arguments. + MOVQ DI, 0(SP) + MOVQ R10, 8(SP) + MOVQ AX, 16(SP) + ADDQ AX, DI // Finish the "d +=" part of "d += emitLiteral(etc)". + MOVQ SI, 72(SP) + MOVQ DI, 80(SP) + MOVQ R15, 112(SP) + CALL runtime·memmove(SB) + MOVQ 56(SP), CX + MOVQ 64(SP), DX + MOVQ 72(SP), SI + MOVQ 80(SP), DI + MOVQ 88(SP), R9 + MOVQ 112(SP), R15 + JMP inner1 + +inlineEmitLiteralEnd: + // End inline of the emitLiteral call. + // ---------------------------------------- + +emitLiteralFastPath: + // !!! Emit the 1-byte encoding "uint8(len(lit)-1)<<2". + MOVB AX, BX + SUBB $1, BX + SHLB $2, BX + MOVB BX, (DI) + ADDQ $1, DI + + // !!! Implement the copy from lit to dst as a 16-byte load and store. + // (Encode's documentation says that dst and src must not overlap.) + // + // This always copies 16 bytes, instead of only len(lit) bytes, but that's + // OK. Subsequent iterations will fix up the overrun. + // + // Note that on amd64, it is legal and cheap to issue unaligned 8-byte or + // 16-byte loads and stores. This technique probably wouldn't be as + // effective on architectures that are fussier about alignment. + MOVOU 0(R10), X0 + MOVOU X0, 0(DI) + ADDQ AX, DI + +inner1: + // for { etc } + + // base := s + MOVQ SI, R12 + + // !!! offset := base - candidate + MOVQ R12, R11 + SUBQ R15, R11 + SUBQ DX, R11 + + // ---------------------------------------- + // Begin inline of the extendMatch call. + // + // s = extendMatch(src, candidate+4, s+4) + + // !!! R14 = &src[len(src)] + MOVQ src_len+32(FP), R14 + ADDQ DX, R14 + + // !!! R13 = &src[len(src) - 8] + MOVQ R14, R13 + SUBQ $8, R13 + + // !!! R15 = &src[candidate + 4] + ADDQ $4, R15 + ADDQ DX, R15 + + // !!! s += 4 + ADDQ $4, SI + +inlineExtendMatchCmp8: + // As long as we are 8 or more bytes before the end of src, we can load and + // compare 8 bytes at a time. If those 8 bytes are equal, repeat. + CMPQ SI, R13 + JA inlineExtendMatchCmp1 + MOVQ (R15), AX + MOVQ (SI), BX + CMPQ AX, BX + JNE inlineExtendMatchBSF + ADDQ $8, R15 + ADDQ $8, SI + JMP inlineExtendMatchCmp8 + +inlineExtendMatchBSF: + // If those 8 bytes were not equal, XOR the two 8 byte values, and return + // the index of the first byte that differs. The BSF instruction finds the + // least significant 1 bit, the amd64 architecture is little-endian, and + // the shift by 3 converts a bit index to a byte index. + XORQ AX, BX + BSFQ BX, BX + SHRQ $3, BX + ADDQ BX, SI + JMP inlineExtendMatchEnd + +inlineExtendMatchCmp1: + // In src's tail, compare 1 byte at a time. + CMPQ SI, R14 + JAE inlineExtendMatchEnd + MOVB (R15), AX + MOVB (SI), BX + CMPB AX, BX + JNE inlineExtendMatchEnd + ADDQ $1, R15 + ADDQ $1, SI + JMP inlineExtendMatchCmp1 + +inlineExtendMatchEnd: + // End inline of the extendMatch call. + // ---------------------------------------- + + // ---------------------------------------- + // Begin inline of the emitCopy call. + // + // d += emitCopy(dst[d:], base-candidate, s-base) + + // !!! length := s - base + MOVQ SI, AX + SUBQ R12, AX + +inlineEmitCopyLoop0: + // for length >= 68 { etc } + CMPL AX, $68 + JLT inlineEmitCopyStep1 + + // Emit a length 64 copy, encoded as 3 bytes. + MOVB $0xfe, 0(DI) + MOVW R11, 1(DI) + ADDQ $3, DI + SUBL $64, AX + JMP inlineEmitCopyLoop0 + +inlineEmitCopyStep1: + // if length > 64 { etc } + CMPL AX, $64 + JLE inlineEmitCopyStep2 + + // Emit a length 60 copy, encoded as 3 bytes. + MOVB $0xee, 0(DI) + MOVW R11, 1(DI) + ADDQ $3, DI + SUBL $60, AX + +inlineEmitCopyStep2: + // if length >= 12 || offset >= 2048 { goto inlineEmitCopyStep3 } + CMPL AX, $12 + JGE inlineEmitCopyStep3 + CMPL R11, $2048 + JGE inlineEmitCopyStep3 + + // Emit the remaining copy, encoded as 2 bytes. + MOVB R11, 1(DI) + SHRL $8, R11 + SHLB $5, R11 + SUBB $4, AX + SHLB $2, AX + ORB AX, R11 + ORB $1, R11 + MOVB R11, 0(DI) + ADDQ $2, DI + JMP inlineEmitCopyEnd + +inlineEmitCopyStep3: + // Emit the remaining copy, encoded as 3 bytes. + SUBL $1, AX + SHLB $2, AX + ORB $2, AX + MOVB AX, 0(DI) + MOVW R11, 1(DI) + ADDQ $3, DI + +inlineEmitCopyEnd: + // End inline of the emitCopy call. + // ---------------------------------------- + + // nextEmit = s + MOVQ SI, R10 + + // if s >= sLimit { goto emitRemainder } + MOVQ SI, AX + SUBQ DX, AX + CMPQ AX, R9 + JAE emitRemainder + + // As per the encode_other.go code: + // + // We could immediately etc. + + // x := load64(src, s-1) + MOVQ -1(SI), R14 + + // prevHash := hash(uint32(x>>0), shift) + MOVL R14, R11 + IMULL $0x1e35a7bd, R11 + SHRL CX, R11 + + // table[prevHash] = uint16(s-1) + MOVQ SI, AX + SUBQ DX, AX + SUBQ $1, AX + + // XXX: MOVW AX, table-32768(SP)(R11*2) + // XXX: 66 42 89 44 5c 78 mov %ax,0x78(%rsp,%r11,2) + BYTE $0x66 + BYTE $0x42 + BYTE $0x89 + BYTE $0x44 + BYTE $0x5c + BYTE $0x78 + + // currHash := hash(uint32(x>>8), shift) + SHRQ $8, R14 + MOVL R14, R11 + IMULL $0x1e35a7bd, R11 + SHRL CX, R11 + + // candidate = int(table[currHash]) + // XXX: MOVWQZX table-32768(SP)(R11*2), R15 + // XXX: 4e 0f b7 7c 5c 78 movzwq 0x78(%rsp,%r11,2),%r15 + BYTE $0x4e + BYTE $0x0f + BYTE $0xb7 + BYTE $0x7c + BYTE $0x5c + BYTE $0x78 + + // table[currHash] = uint16(s) + ADDQ $1, AX + + // XXX: MOVW AX, table-32768(SP)(R11*2) + // XXX: 66 42 89 44 5c 78 mov %ax,0x78(%rsp,%r11,2) + BYTE $0x66 + BYTE $0x42 + BYTE $0x89 + BYTE $0x44 + BYTE $0x5c + BYTE $0x78 + + // if uint32(x>>8) == load32(src, candidate) { continue } + MOVL (DX)(R15*1), BX + CMPL R14, BX + JEQ inner1 + + // nextHash = hash(uint32(x>>16), shift) + SHRQ $8, R14 + MOVL R14, R11 + IMULL $0x1e35a7bd, R11 + SHRL CX, R11 + + // s++ + ADDQ $1, SI + + // break out of the inner1 for loop, i.e. continue the outer loop. + JMP outer + +emitRemainder: + // if nextEmit < len(src) { etc } + MOVQ src_len+32(FP), AX + ADDQ DX, AX + CMPQ R10, AX + JEQ encodeBlockEnd + + // d += emitLiteral(dst[d:], src[nextEmit:]) + // + // Push args. + MOVQ DI, 0(SP) + MOVQ $0, 8(SP) // Unnecessary, as the callee ignores it, but conservative. + MOVQ $0, 16(SP) // Unnecessary, as the callee ignores it, but conservative. + MOVQ R10, 24(SP) + SUBQ R10, AX + MOVQ AX, 32(SP) + MOVQ AX, 40(SP) // Unnecessary, as the callee ignores it, but conservative. + + // Spill local variables (registers) onto the stack; call; unspill. + MOVQ DI, 80(SP) + CALL ·emitLiteral(SB) + MOVQ 80(SP), DI + + // Finish the "d +=" part of "d += emitLiteral(etc)". + ADDQ 48(SP), DI + +encodeBlockEnd: + MOVQ dst_base+0(FP), AX + SUBQ AX, DI + MOVQ DI, d+48(FP) + RET diff --git a/vendor/github.com/golang/snappy/encode_other.go b/vendor/github.com/golang/snappy/encode_other.go new file mode 100644 index 00000000..dbcae905 --- /dev/null +++ b/vendor/github.com/golang/snappy/encode_other.go @@ -0,0 +1,238 @@ +// Copyright 2016 The Snappy-Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !amd64 appengine !gc noasm + +package snappy + +func load32(b []byte, i int) uint32 { + b = b[i : i+4 : len(b)] // Help the compiler eliminate bounds checks on the next line. + return uint32(b[0]) | uint32(b[1])<<8 | uint32(b[2])<<16 | uint32(b[3])<<24 +} + +func load64(b []byte, i int) uint64 { + b = b[i : i+8 : len(b)] // Help the compiler eliminate bounds checks on the next line. + return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 | uint64(b[3])<<24 | + uint64(b[4])<<32 | uint64(b[5])<<40 | uint64(b[6])<<48 | uint64(b[7])<<56 +} + +// emitLiteral writes a literal chunk and returns the number of bytes written. +// +// It assumes that: +// dst is long enough to hold the encoded bytes +// 1 <= len(lit) && len(lit) <= 65536 +func emitLiteral(dst, lit []byte) int { + i, n := 0, uint(len(lit)-1) + switch { + case n < 60: + dst[0] = uint8(n)<<2 | tagLiteral + i = 1 + case n < 1<<8: + dst[0] = 60<<2 | tagLiteral + dst[1] = uint8(n) + i = 2 + default: + dst[0] = 61<<2 | tagLiteral + dst[1] = uint8(n) + dst[2] = uint8(n >> 8) + i = 3 + } + return i + copy(dst[i:], lit) +} + +// emitCopy writes a copy chunk and returns the number of bytes written. +// +// It assumes that: +// dst is long enough to hold the encoded bytes +// 1 <= offset && offset <= 65535 +// 4 <= length && length <= 65535 +func emitCopy(dst []byte, offset, length int) int { + i := 0 + // The maximum length for a single tagCopy1 or tagCopy2 op is 64 bytes. The + // threshold for this loop is a little higher (at 68 = 64 + 4), and the + // length emitted down below is is a little lower (at 60 = 64 - 4), because + // it's shorter to encode a length 67 copy as a length 60 tagCopy2 followed + // by a length 7 tagCopy1 (which encodes as 3+2 bytes) than to encode it as + // a length 64 tagCopy2 followed by a length 3 tagCopy2 (which encodes as + // 3+3 bytes). The magic 4 in the 64±4 is because the minimum length for a + // tagCopy1 op is 4 bytes, which is why a length 3 copy has to be an + // encodes-as-3-bytes tagCopy2 instead of an encodes-as-2-bytes tagCopy1. + for length >= 68 { + // Emit a length 64 copy, encoded as 3 bytes. + dst[i+0] = 63<<2 | tagCopy2 + dst[i+1] = uint8(offset) + dst[i+2] = uint8(offset >> 8) + i += 3 + length -= 64 + } + if length > 64 { + // Emit a length 60 copy, encoded as 3 bytes. + dst[i+0] = 59<<2 | tagCopy2 + dst[i+1] = uint8(offset) + dst[i+2] = uint8(offset >> 8) + i += 3 + length -= 60 + } + if length >= 12 || offset >= 2048 { + // Emit the remaining copy, encoded as 3 bytes. + dst[i+0] = uint8(length-1)<<2 | tagCopy2 + dst[i+1] = uint8(offset) + dst[i+2] = uint8(offset >> 8) + return i + 3 + } + // Emit the remaining copy, encoded as 2 bytes. + dst[i+0] = uint8(offset>>8)<<5 | uint8(length-4)<<2 | tagCopy1 + dst[i+1] = uint8(offset) + return i + 2 +} + +// extendMatch returns the largest k such that k <= len(src) and that +// src[i:i+k-j] and src[j:k] have the same contents. +// +// It assumes that: +// 0 <= i && i < j && j <= len(src) +func extendMatch(src []byte, i, j int) int { + for ; j < len(src) && src[i] == src[j]; i, j = i+1, j+1 { + } + return j +} + +func hash(u, shift uint32) uint32 { + return (u * 0x1e35a7bd) >> shift +} + +// encodeBlock encodes a non-empty src to a guaranteed-large-enough dst. It +// assumes that the varint-encoded length of the decompressed bytes has already +// been written. +// +// It also assumes that: +// len(dst) >= MaxEncodedLen(len(src)) && +// minNonLiteralBlockSize <= len(src) && len(src) <= maxBlockSize +func encodeBlock(dst, src []byte) (d int) { + // Initialize the hash table. Its size ranges from 1<<8 to 1<<14 inclusive. + // The table element type is uint16, as s < sLimit and sLimit < len(src) + // and len(src) <= maxBlockSize and maxBlockSize == 65536. + const ( + maxTableSize = 1 << 14 + // tableMask is redundant, but helps the compiler eliminate bounds + // checks. + tableMask = maxTableSize - 1 + ) + shift := uint32(32 - 8) + for tableSize := 1 << 8; tableSize < maxTableSize && tableSize < len(src); tableSize *= 2 { + shift-- + } + // In Go, all array elements are zero-initialized, so there is no advantage + // to a smaller tableSize per se. However, it matches the C++ algorithm, + // and in the asm versions of this code, we can get away with zeroing only + // the first tableSize elements. + var table [maxTableSize]uint16 + + // sLimit is when to stop looking for offset/length copies. The inputMargin + // lets us use a fast path for emitLiteral in the main loop, while we are + // looking for copies. + sLimit := len(src) - inputMargin + + // nextEmit is where in src the next emitLiteral should start from. + nextEmit := 0 + + // The encoded form must start with a literal, as there are no previous + // bytes to copy, so we start looking for hash matches at s == 1. + s := 1 + nextHash := hash(load32(src, s), shift) + + for { + // Copied from the C++ snappy implementation: + // + // Heuristic match skipping: If 32 bytes are scanned with no matches + // found, start looking only at every other byte. If 32 more bytes are + // scanned (or skipped), look at every third byte, etc.. When a match + // is found, immediately go back to looking at every byte. This is a + // small loss (~5% performance, ~0.1% density) for compressible data + // due to more bookkeeping, but for non-compressible data (such as + // JPEG) it's a huge win since the compressor quickly "realizes" the + // data is incompressible and doesn't bother looking for matches + // everywhere. + // + // The "skip" variable keeps track of how many bytes there are since + // the last match; dividing it by 32 (ie. right-shifting by five) gives + // the number of bytes to move ahead for each iteration. + skip := 32 + + nextS := s + candidate := 0 + for { + s = nextS + bytesBetweenHashLookups := skip >> 5 + nextS = s + bytesBetweenHashLookups + skip += bytesBetweenHashLookups + if nextS > sLimit { + goto emitRemainder + } + candidate = int(table[nextHash&tableMask]) + table[nextHash&tableMask] = uint16(s) + nextHash = hash(load32(src, nextS), shift) + if load32(src, s) == load32(src, candidate) { + break + } + } + + // A 4-byte match has been found. We'll later see if more than 4 bytes + // match. But, prior to the match, src[nextEmit:s] are unmatched. Emit + // them as literal bytes. + d += emitLiteral(dst[d:], src[nextEmit:s]) + + // Call emitCopy, and then see if another emitCopy could be our next + // move. Repeat until we find no match for the input immediately after + // what was consumed by the last emitCopy call. + // + // If we exit this loop normally then we need to call emitLiteral next, + // though we don't yet know how big the literal will be. We handle that + // by proceeding to the next iteration of the main loop. We also can + // exit this loop via goto if we get close to exhausting the input. + for { + // Invariant: we have a 4-byte match at s, and no need to emit any + // literal bytes prior to s. + base := s + + // Extend the 4-byte match as long as possible. + // + // This is an inlined version of: + // s = extendMatch(src, candidate+4, s+4) + s += 4 + for i := candidate + 4; s < len(src) && src[i] == src[s]; i, s = i+1, s+1 { + } + + d += emitCopy(dst[d:], base-candidate, s-base) + nextEmit = s + if s >= sLimit { + goto emitRemainder + } + + // We could immediately start working at s now, but to improve + // compression we first update the hash table at s-1 and at s. If + // another emitCopy is not our next move, also calculate nextHash + // at s+1. At least on GOARCH=amd64, these three hash calculations + // are faster as one load64 call (with some shifts) instead of + // three load32 calls. + x := load64(src, s-1) + prevHash := hash(uint32(x>>0), shift) + table[prevHash&tableMask] = uint16(s - 1) + currHash := hash(uint32(x>>8), shift) + candidate = int(table[currHash&tableMask]) + table[currHash&tableMask] = uint16(s) + if uint32(x>>8) != load32(src, candidate) { + nextHash = hash(uint32(x>>16), shift) + s++ + break + } + } + } + +emitRemainder: + if nextEmit < len(src) { + d += emitLiteral(dst[d:], src[nextEmit:]) + } + return d +} diff --git a/vendor/github.com/golang/snappy/snappy.go b/vendor/github.com/golang/snappy/snappy.go new file mode 100644 index 00000000..0cf5e379 --- /dev/null +++ b/vendor/github.com/golang/snappy/snappy.go @@ -0,0 +1,87 @@ +// Copyright 2011 The Snappy-Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package snappy implements the snappy block-based compression format. +// It aims for very high speeds and reasonable compression. +// +// The C++ snappy implementation is at https://github.com/google/snappy +package snappy // import "github.com/golang/snappy" + +import ( + "hash/crc32" +) + +/* +Each encoded block begins with the varint-encoded length of the decoded data, +followed by a sequence of chunks. Chunks begin and end on byte boundaries. The +first byte of each chunk is broken into its 2 least and 6 most significant bits +called l and m: l ranges in [0, 4) and m ranges in [0, 64). l is the chunk tag. +Zero means a literal tag. All other values mean a copy tag. + +For literal tags: + - If m < 60, the next 1 + m bytes are literal bytes. + - Otherwise, let n be the little-endian unsigned integer denoted by the next + m - 59 bytes. The next 1 + n bytes after that are literal bytes. + +For copy tags, length bytes are copied from offset bytes ago, in the style of +Lempel-Ziv compression algorithms. In particular: + - For l == 1, the offset ranges in [0, 1<<11) and the length in [4, 12). + The length is 4 + the low 3 bits of m. The high 3 bits of m form bits 8-10 + of the offset. The next byte is bits 0-7 of the offset. + - For l == 2, the offset ranges in [0, 1<<16) and the length in [1, 65). + The length is 1 + m. The offset is the little-endian unsigned integer + denoted by the next 2 bytes. + - For l == 3, this tag is a legacy format that is no longer issued by most + encoders. Nonetheless, the offset ranges in [0, 1<<32) and the length in + [1, 65). The length is 1 + m. The offset is the little-endian unsigned + integer denoted by the next 4 bytes. +*/ +const ( + tagLiteral = 0x00 + tagCopy1 = 0x01 + tagCopy2 = 0x02 + tagCopy4 = 0x03 +) + +const ( + checksumSize = 4 + chunkHeaderSize = 4 + magicChunk = "\xff\x06\x00\x00" + magicBody + magicBody = "sNaPpY" + + // maxBlockSize is the maximum size of the input to encodeBlock. It is not + // part of the wire format per se, but some parts of the encoder assume + // that an offset fits into a uint16. + // + // Also, for the framing format (Writer type instead of Encode function), + // https://github.com/google/snappy/blob/master/framing_format.txt says + // that "the uncompressed data in a chunk must be no longer than 65536 + // bytes". + maxBlockSize = 65536 + + // maxEncodedLenOfMaxBlockSize equals MaxEncodedLen(maxBlockSize), but is + // hard coded to be a const instead of a variable, so that obufLen can also + // be a const. Their equivalence is confirmed by + // TestMaxEncodedLenOfMaxBlockSize. + maxEncodedLenOfMaxBlockSize = 76490 + + obufHeaderLen = len(magicChunk) + checksumSize + chunkHeaderSize + obufLen = obufHeaderLen + maxEncodedLenOfMaxBlockSize +) + +const ( + chunkTypeCompressedData = 0x00 + chunkTypeUncompressedData = 0x01 + chunkTypePadding = 0xfe + chunkTypeStreamIdentifier = 0xff +) + +var crcTable = crc32.MakeTable(crc32.Castagnoli) + +// crc implements the checksum specified in section 3 of +// https://github.com/google/snappy/blob/master/framing_format.txt +func crc(b []byte) uint32 { + c := crc32.Update(0, crcTable, b) + return uint32(c>>15|c<<17) + 0xa282ead8 +} diff --git a/vendor/vendor.json b/vendor/vendor.json index d0d7a25f..f98eca14 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -56,6 +56,12 @@ "revision": "57eb5e1fc594ad4b0b1dbea7b286d299e0cb43c2", "revisionTime": "2015-12-24T04:54:52Z" }, + { + "checksumSHA1": "W+E/2xXcE1GmJ0Qb784ald0Fn6I=", + "path": "github.com/golang/snappy", + "revision": "d9eb7a3d35ec988b8585d4a0068e462c27d28380", + "revisionTime": "2016-05-29T05:00:41Z" + }, { "checksumSHA1": "iIUYZyoanCQQTUaWsu8b+iOSPt4=", "path": "github.com/gorilla/context", From 6c0a47fb65bec0e61f541a3b763768acf39865ba Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Tue, 7 Feb 2017 19:23:07 +0100 Subject: [PATCH 3/4] slight improves --- route/grafananet.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/route/grafananet.go b/route/grafananet.go index 35c22ea7..f08b8e80 100644 --- a/route/grafananet.go +++ b/route/grafananet.go @@ -39,7 +39,7 @@ type GrafanaNet struct { timeout time.Duration sslVerify bool concurrency int - writeQueue []chan []byte + writeQueues []chan []byte shutdown chan struct{} wg *sync.WaitGroup client *http.Client @@ -95,7 +95,7 @@ func NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile string, sp timeout: time.Duration(timeout) * time.Millisecond, sslVerify: sslVerify, concurrency: concurrency, - writeQueue: make([]chan []byte, concurrency), + writeQueues: make([]chan []byte, concurrency), shutdown: make(chan struct{}), wg: new(sync.WaitGroup), @@ -108,7 +108,7 @@ func NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile string, sp numBuffered: stats.Gauge("dest=" + cleanAddr + ".unit=Metric.what=numBuffered"), } for i := 0; i < r.concurrency; i++ { - r.writeQueue[i] = make(chan []byte) + r.writeQueues[i] = make(chan []byte) } r.config.Store(baseConfig{*m, make([]*dest.Destination, 0)}) @@ -138,11 +138,11 @@ func NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile string, sp func (route *GrafanaNet) run() { metrics := make([][]*schema.MetricData, route.concurrency) + route.wg.Add(route.concurrency) for i := 0; i < route.concurrency; i++ { metrics[i] = make([]*schema.MetricData, 0, route.flushMaxNum) // start up our goroutines for writing data to tsdb-gw - route.wg.Add(1) go route.flush(i) } @@ -155,7 +155,7 @@ func (route *GrafanaNet) run() { if err != nil { panic(err) } - route.writeQueue[shard] <- data + route.writeQueues[shard] <- data route.numOut.Inc(int64(len(metrics[shard]))) metrics[shard] = metrics[shard][:0] @@ -188,7 +188,7 @@ func (route *GrafanaNet) run() { case <-route.shutdown: for shard := 0; shard < route.concurrency; shard++ { flush(shard) - close(route.writeQueue[shard]) + close(route.writeQueues[shard]) } return } @@ -203,7 +203,7 @@ func (route *GrafanaNet) flush(shard int) { Jitter: true, } body := new(bytes.Buffer) - for data := range route.writeQueue[shard] { + for data := range route.writeQueues[shard] { for { pre := time.Now() body.Reset() From 785e1ddac20b8c1f5d8840383c7ba9dfabef92cf Mon Sep 17 00:00:00 2001 From: woodsaj Date: Thu, 16 Feb 2017 10:20:11 +0800 Subject: [PATCH 4/4] add orgId option to grafanaNet route handler - this allows using the tsdb-gw "adminkey" to send data as any org. --- imperatives/imperatives.go | 18 +++++++++++++++--- route/grafananet.go | 12 ++++++++---- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/imperatives/imperatives.go b/imperatives/imperatives.go index e4833823..d6ad98e6 100644 --- a/imperatives/imperatives.go +++ b/imperatives/imperatives.go @@ -50,6 +50,7 @@ const ( optSSLVerify word optConcurrency + optOrgId ) // we should make sure we apply changes atomatically. e.g. when changing dest between address A and pickle=false and B with pickle=true, @@ -83,6 +84,7 @@ var tokens = []toki.Def{ {Token: optTimeout, Pattern: "timeout="}, {Token: optSSLVerify, Pattern: "sslverify="}, {Token: optConcurrency, Pattern: "concurrency="}, + {Token: optOrgId, Pattern: "orgId="}, {Token: str, Pattern: "\".*\""}, {Token: sep, Pattern: "##"}, {Token: sumFn, Pattern: "sum"}, @@ -96,7 +98,7 @@ var tokens = []toki.Def{ var errFmtAddBlack = errors.New("addBlack ") var errFmtAddAgg = errors.New("addAgg ") var errFmtAddRoute = errors.New("addRoute [prefix/sub/regex=,..] [[...]] where is [prefix/sub,regex,flush,reconn,pickle,spool=...]") // note flush and reconn are ints, pickle and spool are true/false. other options are strings -var errFmtAddRouteGrafanaNet = errors.New("addRoute GrafanaNet key [prefix/sub/regex] addr apiKey schemasFile [spool=true/false sslverify=true/false bufSize=int flushMaxNum=int flushMaxWait=int timeout=int concurrency=int]") +var errFmtAddRouteGrafanaNet = errors.New("addRoute GrafanaNet key [prefix/sub/regex] addr apiKey schemasFile [spool=true/false sslverify=true/false bufSize=int flushMaxNum=int flushMaxWait=int timeout=int concurrency=int orgId=int]") var errFmtAddDest = errors.New("addDest ") // not implemented yet var errFmtAddRewriter = errors.New("addRewriter ") var errFmtModDest = errors.New("modDest ") // one or more can be specified at once @@ -305,6 +307,7 @@ func readAddRouteGrafanaNet(s *toki.Scanner, table *tbl.Table) error { var flushMaxWait = 500 // in ms var timeout = 2000 // in ms var concurrency = 10 // number of concurrent connections to tsdb-gw + var orgId = 1 for ; t.Token != toki.EOF; t = s.Next() { switch t.Token { @@ -378,13 +381,22 @@ func readAddRouteGrafanaNet(s *toki.Scanner, table *tbl.Table) error { } else { return errFmtAddRouteGrafanaNet } - + case optOrgId: + t = s.Next() + if t.Token == num { + orgId, err = strconv.Atoi(strings.TrimSpace(string(t.Value))) + if err != nil { + return err + } + } else { + return errFmtAddRouteGrafanaNet + } default: return fmt.Errorf("unexpected token %d %q", t.Token, t.Value) } } - route, err := route.NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile, spool, sslVerify, bufSize, flushMaxNum, flushMaxWait, timeout, concurrency) + route, err := route.NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile, spool, sslVerify, bufSize, flushMaxNum, flushMaxWait, timeout, concurrency, orgId) if err != nil { return err } diff --git a/route/grafananet.go b/route/grafananet.go index f08b8e80..4399075f 100644 --- a/route/grafananet.go +++ b/route/grafananet.go @@ -39,6 +39,7 @@ type GrafanaNet struct { timeout time.Duration sslVerify bool concurrency int + orgId int writeQueues []chan []byte shutdown chan struct{} wg *sync.WaitGroup @@ -56,7 +57,7 @@ type GrafanaNet struct { // NewGrafanaNet creates a special route that writes to a grafana.net datastore // We will automatically run the route and the destination // ignores spool for now -func NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile string, spool, sslVerify bool, bufSize, flushMaxNum, flushMaxWait, timeout, concurrency int) (Route, error) { +func NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile string, spool, sslVerify bool, bufSize, flushMaxNum, flushMaxWait, timeout, concurrency, orgId int) (Route, error) { m, err := matcher.New(prefix, sub, regex) if err != nil { return nil, err @@ -95,6 +96,7 @@ func NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile string, sp timeout: time.Duration(timeout) * time.Millisecond, sslVerify: sslVerify, concurrency: concurrency, + orgId: orgId, writeQueues: make([]chan []byte, concurrency), shutdown: make(chan struct{}), wg: new(sync.WaitGroup), @@ -168,11 +170,13 @@ func (route *GrafanaNet) run() { select { case buf := <-route.buf: route.numBuffered.Dec(1) - md := parseMetric(buf, route.schemas) + md := parseMetric(buf, route.schemas, route.orgId) if md == nil { continue } md.SetId() + + //re-use our []byte slice to save an allocation. buf = md.KeyBySeries(buf[:0]) hasher.Reset() hasher.Write(buf) @@ -245,7 +249,7 @@ func (route *GrafanaNet) flush(shard int) { route.wg.Done() } -func parseMetric(buf []byte, schemas persister.WhisperSchemas) *schema.MetricData { +func parseMetric(buf []byte, schemas persister.WhisperSchemas, orgId int) *schema.MetricData { msg := strings.TrimSpace(string(buf)) elements := strings.Fields(msg) @@ -279,7 +283,7 @@ func parseMetric(buf []byte, schemas persister.WhisperSchemas) *schema.MetricDat Time: int64(timestamp), Mtype: "gauge", Tags: []string{}, - OrgId: 1, // the hosted tsdb service will adjust to the correct OrgId if using a regular key. orgid 1 is only retained with admin key. + OrgId: orgId, // This may be overwritten by the TSDB-GW if it does not match the orgId of the apiKey used } return &md }