mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-07 08:32:18 +01:00
b6d88bac04
The upstream fasthttp may contain issues like 996610f021
,
plus a code that isn't used by VictoriaMetrics. So let's use a private copy under our control instead.
441 lines
11 KiB
Go
441 lines
11 KiB
Go
package fasthttp
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/fasthttp/stackless"
|
|
"github.com/klauspost/compress/flate"
|
|
"github.com/klauspost/compress/gzip"
|
|
"github.com/klauspost/compress/zlib"
|
|
"github.com/valyala/bytebufferpool"
|
|
)
|
|
|
|
// Supported compression levels.
|
|
const (
|
|
CompressNoCompression = flate.NoCompression
|
|
CompressBestSpeed = flate.BestSpeed
|
|
CompressBestCompression = flate.BestCompression
|
|
CompressDefaultCompression = 6 // flate.DefaultCompression
|
|
CompressHuffmanOnly = -2 // flate.HuffmanOnly
|
|
)
|
|
|
|
func acquireGzipReader(r io.Reader) (*gzip.Reader, error) {
|
|
v := gzipReaderPool.Get()
|
|
if v == nil {
|
|
return gzip.NewReader(r)
|
|
}
|
|
zr := v.(*gzip.Reader)
|
|
if err := zr.Reset(r); err != nil {
|
|
return nil, err
|
|
}
|
|
return zr, nil
|
|
}
|
|
|
|
func releaseGzipReader(zr *gzip.Reader) {
|
|
zr.Close()
|
|
gzipReaderPool.Put(zr)
|
|
}
|
|
|
|
var gzipReaderPool sync.Pool
|
|
|
|
func acquireFlateReader(r io.Reader) (io.ReadCloser, error) {
|
|
v := flateReaderPool.Get()
|
|
if v == nil {
|
|
zr, err := zlib.NewReader(r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return zr, nil
|
|
}
|
|
zr := v.(io.ReadCloser)
|
|
if err := resetFlateReader(zr, r); err != nil {
|
|
return nil, err
|
|
}
|
|
return zr, nil
|
|
}
|
|
|
|
func releaseFlateReader(zr io.ReadCloser) {
|
|
zr.Close()
|
|
flateReaderPool.Put(zr)
|
|
}
|
|
|
|
func resetFlateReader(zr io.ReadCloser, r io.Reader) error {
|
|
zrr, ok := zr.(zlib.Resetter)
|
|
if !ok {
|
|
panic("BUG: zlib.Reader doesn't implement zlib.Resetter???")
|
|
}
|
|
return zrr.Reset(r, nil)
|
|
}
|
|
|
|
var flateReaderPool sync.Pool
|
|
|
|
func acquireStacklessGzipWriter(w io.Writer, level int) stackless.Writer {
|
|
nLevel := normalizeCompressLevel(level)
|
|
p := stacklessGzipWriterPoolMap[nLevel]
|
|
v := p.Get()
|
|
if v == nil {
|
|
return stackless.NewWriter(w, func(w io.Writer) stackless.Writer {
|
|
return acquireRealGzipWriter(w, level)
|
|
})
|
|
}
|
|
sw := v.(stackless.Writer)
|
|
sw.Reset(w)
|
|
return sw
|
|
}
|
|
|
|
func releaseStacklessGzipWriter(sw stackless.Writer, level int) {
|
|
sw.Close()
|
|
nLevel := normalizeCompressLevel(level)
|
|
p := stacklessGzipWriterPoolMap[nLevel]
|
|
p.Put(sw)
|
|
}
|
|
|
|
func acquireRealGzipWriter(w io.Writer, level int) *gzip.Writer {
|
|
nLevel := normalizeCompressLevel(level)
|
|
p := realGzipWriterPoolMap[nLevel]
|
|
v := p.Get()
|
|
if v == nil {
|
|
zw, err := gzip.NewWriterLevel(w, level)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("BUG: unexpected error from gzip.NewWriterLevel(%d): %s", level, err))
|
|
}
|
|
return zw
|
|
}
|
|
zw := v.(*gzip.Writer)
|
|
zw.Reset(w)
|
|
return zw
|
|
}
|
|
|
|
func releaseRealGzipWriter(zw *gzip.Writer, level int) {
|
|
zw.Close()
|
|
nLevel := normalizeCompressLevel(level)
|
|
p := realGzipWriterPoolMap[nLevel]
|
|
p.Put(zw)
|
|
}
|
|
|
|
var (
|
|
stacklessGzipWriterPoolMap = newCompressWriterPoolMap()
|
|
realGzipWriterPoolMap = newCompressWriterPoolMap()
|
|
)
|
|
|
|
// AppendGzipBytesLevel appends gzipped src to dst using the given
|
|
// compression level and returns the resulting dst.
|
|
//
|
|
// Supported compression levels are:
|
|
//
|
|
// * CompressNoCompression
|
|
// * CompressBestSpeed
|
|
// * CompressBestCompression
|
|
// * CompressDefaultCompression
|
|
// * CompressHuffmanOnly
|
|
func AppendGzipBytesLevel(dst, src []byte, level int) []byte {
|
|
w := &byteSliceWriter{dst}
|
|
WriteGzipLevel(w, src, level)
|
|
return w.b
|
|
}
|
|
|
|
// WriteGzipLevel writes gzipped p to w using the given compression level
|
|
// and returns the number of compressed bytes written to w.
|
|
//
|
|
// Supported compression levels are:
|
|
//
|
|
// * CompressNoCompression
|
|
// * CompressBestSpeed
|
|
// * CompressBestCompression
|
|
// * CompressDefaultCompression
|
|
// * CompressHuffmanOnly
|
|
func WriteGzipLevel(w io.Writer, p []byte, level int) (int, error) {
|
|
switch w.(type) {
|
|
case *byteSliceWriter,
|
|
*bytes.Buffer,
|
|
*ByteBuffer,
|
|
*bytebufferpool.ByteBuffer:
|
|
// These writers don't block, so we can just use stacklessWriteGzip
|
|
ctx := &compressCtx{
|
|
w: w,
|
|
p: p,
|
|
level: level,
|
|
}
|
|
stacklessWriteGzip(ctx)
|
|
return len(p), nil
|
|
default:
|
|
zw := acquireStacklessGzipWriter(w, level)
|
|
n, err := zw.Write(p)
|
|
releaseStacklessGzipWriter(zw, level)
|
|
return n, err
|
|
}
|
|
}
|
|
|
|
var stacklessWriteGzip = stackless.NewFunc(nonblockingWriteGzip)
|
|
|
|
func nonblockingWriteGzip(ctxv interface{}) {
|
|
ctx := ctxv.(*compressCtx)
|
|
zw := acquireRealGzipWriter(ctx.w, ctx.level)
|
|
|
|
_, err := zw.Write(ctx.p)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("BUG: gzip.Writer.Write for len(p)=%d returned unexpected error: %s", len(ctx.p), err))
|
|
}
|
|
|
|
releaseRealGzipWriter(zw, ctx.level)
|
|
}
|
|
|
|
// WriteGzip writes gzipped p to w and returns the number of compressed
|
|
// bytes written to w.
|
|
func WriteGzip(w io.Writer, p []byte) (int, error) {
|
|
return WriteGzipLevel(w, p, CompressDefaultCompression)
|
|
}
|
|
|
|
// AppendGzipBytes appends gzipped src to dst and returns the resulting dst.
|
|
func AppendGzipBytes(dst, src []byte) []byte {
|
|
return AppendGzipBytesLevel(dst, src, CompressDefaultCompression)
|
|
}
|
|
|
|
// WriteGunzip writes ungzipped p to w and returns the number of uncompressed
|
|
// bytes written to w.
|
|
func WriteGunzip(w io.Writer, p []byte) (int, error) {
|
|
r := &byteSliceReader{p}
|
|
zr, err := acquireGzipReader(r)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
n, err := copyZeroAlloc(w, zr)
|
|
releaseGzipReader(zr)
|
|
nn := int(n)
|
|
if int64(nn) != n {
|
|
return 0, fmt.Errorf("too much data gunzipped: %d", n)
|
|
}
|
|
return nn, err
|
|
}
|
|
|
|
// AppendGunzipBytes appends gunzipped src to dst and returns the resulting dst.
|
|
func AppendGunzipBytes(dst, src []byte) ([]byte, error) {
|
|
w := &byteSliceWriter{dst}
|
|
_, err := WriteGunzip(w, src)
|
|
return w.b, err
|
|
}
|
|
|
|
// AppendDeflateBytesLevel appends deflated src to dst using the given
|
|
// compression level and returns the resulting dst.
|
|
//
|
|
// Supported compression levels are:
|
|
//
|
|
// * CompressNoCompression
|
|
// * CompressBestSpeed
|
|
// * CompressBestCompression
|
|
// * CompressDefaultCompression
|
|
// * CompressHuffmanOnly
|
|
func AppendDeflateBytesLevel(dst, src []byte, level int) []byte {
|
|
w := &byteSliceWriter{dst}
|
|
WriteDeflateLevel(w, src, level)
|
|
return w.b
|
|
}
|
|
|
|
// WriteDeflateLevel writes deflated p to w using the given compression level
|
|
// and returns the number of compressed bytes written to w.
|
|
//
|
|
// Supported compression levels are:
|
|
//
|
|
// * CompressNoCompression
|
|
// * CompressBestSpeed
|
|
// * CompressBestCompression
|
|
// * CompressDefaultCompression
|
|
// * CompressHuffmanOnly
|
|
func WriteDeflateLevel(w io.Writer, p []byte, level int) (int, error) {
|
|
switch w.(type) {
|
|
case *byteSliceWriter,
|
|
*bytes.Buffer,
|
|
*ByteBuffer,
|
|
*bytebufferpool.ByteBuffer:
|
|
// These writers don't block, so we can just use stacklessWriteDeflate
|
|
ctx := &compressCtx{
|
|
w: w,
|
|
p: p,
|
|
level: level,
|
|
}
|
|
stacklessWriteDeflate(ctx)
|
|
return len(p), nil
|
|
default:
|
|
zw := acquireStacklessDeflateWriter(w, level)
|
|
n, err := zw.Write(p)
|
|
releaseStacklessDeflateWriter(zw, level)
|
|
return n, err
|
|
}
|
|
}
|
|
|
|
var stacklessWriteDeflate = stackless.NewFunc(nonblockingWriteDeflate)
|
|
|
|
func nonblockingWriteDeflate(ctxv interface{}) {
|
|
ctx := ctxv.(*compressCtx)
|
|
zw := acquireRealDeflateWriter(ctx.w, ctx.level)
|
|
|
|
_, err := zw.Write(ctx.p)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("BUG: zlib.Writer.Write for len(p)=%d returned unexpected error: %s", len(ctx.p), err))
|
|
}
|
|
|
|
releaseRealDeflateWriter(zw, ctx.level)
|
|
}
|
|
|
|
type compressCtx struct {
|
|
w io.Writer
|
|
p []byte
|
|
level int
|
|
}
|
|
|
|
// WriteDeflate writes deflated p to w and returns the number of compressed
|
|
// bytes written to w.
|
|
func WriteDeflate(w io.Writer, p []byte) (int, error) {
|
|
return WriteDeflateLevel(w, p, CompressDefaultCompression)
|
|
}
|
|
|
|
// AppendDeflateBytes appends deflated src to dst and returns the resulting dst.
|
|
func AppendDeflateBytes(dst, src []byte) []byte {
|
|
return AppendDeflateBytesLevel(dst, src, CompressDefaultCompression)
|
|
}
|
|
|
|
// WriteInflate writes inflated p to w and returns the number of uncompressed
|
|
// bytes written to w.
|
|
func WriteInflate(w io.Writer, p []byte) (int, error) {
|
|
r := &byteSliceReader{p}
|
|
zr, err := acquireFlateReader(r)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
n, err := copyZeroAlloc(w, zr)
|
|
releaseFlateReader(zr)
|
|
nn := int(n)
|
|
if int64(nn) != n {
|
|
return 0, fmt.Errorf("too much data inflated: %d", n)
|
|
}
|
|
return nn, err
|
|
}
|
|
|
|
// AppendInflateBytes appends inflated src to dst and returns the resulting dst.
|
|
func AppendInflateBytes(dst, src []byte) ([]byte, error) {
|
|
w := &byteSliceWriter{dst}
|
|
_, err := WriteInflate(w, src)
|
|
return w.b, err
|
|
}
|
|
|
|
type byteSliceWriter struct {
|
|
b []byte
|
|
}
|
|
|
|
func (w *byteSliceWriter) Write(p []byte) (int, error) {
|
|
w.b = append(w.b, p...)
|
|
return len(p), nil
|
|
}
|
|
|
|
type byteSliceReader struct {
|
|
b []byte
|
|
}
|
|
|
|
func (r *byteSliceReader) Read(p []byte) (int, error) {
|
|
if len(r.b) == 0 {
|
|
return 0, io.EOF
|
|
}
|
|
n := copy(p, r.b)
|
|
r.b = r.b[n:]
|
|
return n, nil
|
|
}
|
|
|
|
func acquireStacklessDeflateWriter(w io.Writer, level int) stackless.Writer {
|
|
nLevel := normalizeCompressLevel(level)
|
|
p := stacklessDeflateWriterPoolMap[nLevel]
|
|
v := p.Get()
|
|
if v == nil {
|
|
return stackless.NewWriter(w, func(w io.Writer) stackless.Writer {
|
|
return acquireRealDeflateWriter(w, level)
|
|
})
|
|
}
|
|
sw := v.(stackless.Writer)
|
|
sw.Reset(w)
|
|
return sw
|
|
}
|
|
|
|
func releaseStacklessDeflateWriter(sw stackless.Writer, level int) {
|
|
sw.Close()
|
|
nLevel := normalizeCompressLevel(level)
|
|
p := stacklessDeflateWriterPoolMap[nLevel]
|
|
p.Put(sw)
|
|
}
|
|
|
|
func acquireRealDeflateWriter(w io.Writer, level int) *zlib.Writer {
|
|
nLevel := normalizeCompressLevel(level)
|
|
p := realDeflateWriterPoolMap[nLevel]
|
|
v := p.Get()
|
|
if v == nil {
|
|
zw, err := zlib.NewWriterLevel(w, level)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("BUG: unexpected error from zlib.NewWriterLevel(%d): %s", level, err))
|
|
}
|
|
return zw
|
|
}
|
|
zw := v.(*zlib.Writer)
|
|
zw.Reset(w)
|
|
return zw
|
|
}
|
|
|
|
func releaseRealDeflateWriter(zw *zlib.Writer, level int) {
|
|
zw.Close()
|
|
nLevel := normalizeCompressLevel(level)
|
|
p := realDeflateWriterPoolMap[nLevel]
|
|
p.Put(zw)
|
|
}
|
|
|
|
var (
|
|
stacklessDeflateWriterPoolMap = newCompressWriterPoolMap()
|
|
realDeflateWriterPoolMap = newCompressWriterPoolMap()
|
|
)
|
|
|
|
func newCompressWriterPoolMap() []*sync.Pool {
|
|
// Initialize pools for all the compression levels defined
|
|
// in https://golang.org/pkg/compress/flate/#pkg-constants .
|
|
// Compression levels are normalized with normalizeCompressLevel,
|
|
// so the fit [0..11].
|
|
var m []*sync.Pool
|
|
for i := 0; i < 12; i++ {
|
|
m = append(m, &sync.Pool{})
|
|
}
|
|
return m
|
|
}
|
|
|
|
func isFileCompressible(f *os.File, minCompressRatio float64) bool {
|
|
// Try compressing the first 4kb of of the file
|
|
// and see if it can be compressed by more than
|
|
// the given minCompressRatio.
|
|
b := AcquireByteBuffer()
|
|
zw := acquireStacklessGzipWriter(b, CompressDefaultCompression)
|
|
lr := &io.LimitedReader{
|
|
R: f,
|
|
N: 4096,
|
|
}
|
|
_, err := copyZeroAlloc(zw, lr)
|
|
releaseStacklessGzipWriter(zw, CompressDefaultCompression)
|
|
f.Seek(0, 0)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
n := 4096 - lr.N
|
|
zn := len(b.B)
|
|
ReleaseByteBuffer(b)
|
|
return float64(zn) < float64(n)*minCompressRatio
|
|
}
|
|
|
|
// normalizes compression level into [0..11], so it could be used as an index
|
|
// in *PoolMap.
|
|
func normalizeCompressLevel(level int) int {
|
|
// -2 is the lowest compression level - CompressHuffmanOnly
|
|
// 9 is the highest compression level - CompressBestCompression
|
|
if level < -2 || level > 9 {
|
|
level = CompressDefaultCompression
|
|
}
|
|
return level + 2
|
|
}
|