mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 16:30:55 +01:00
5b3cbd4db1
* app/vlinsert: add support of loki push protocol - implemented loki push protocol for both Protobuf and JSON formats - added examples in documentation - added example docker-compose Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * app/vlinsert: move protobuf metric into its own file Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * deployment/docker/victorialogs/promtail: update reference to docker image Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * deployment/docker/victorialogs/promtail: make volume name unique Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * app/vlinsert/loki: add license reference Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * deployment/docker/victorialogs/promtail: fix volume name Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs/VictoriaLogs/data-ingestion: add stream fields for loki JSON ingestion example Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * app/vlinsert/loki: move entities to places where those are used Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * app/vlinsert/loki: refactor to use common components - use CommonParameters from insertutils - stop ingestion after first error similar to elasticsearch and jsonline Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * app/vlinsert/loki: address review feedback - add missing logstorage.PutLogRows calls - refactor tenant ID parsing to use common function - reduce number of allocations for parsing by reusing logfields slices - add tests and benchmarks for requests processing funcs Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> --------- Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
130 lines
3.2 KiB
Go
130 lines
3.2 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"fmt"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
)
|
|
|
|
// TenantID is an id of a tenant for log streams.
|
|
//
|
|
// Each log stream is associated with a single TenantID.
|
|
type TenantID struct {
|
|
// AccountID is the id of the account for the log stream.
|
|
AccountID uint32
|
|
|
|
// ProjectID is the id of the project for the log stream.
|
|
ProjectID uint32
|
|
}
|
|
|
|
// Reset resets tid.
|
|
func (tid *TenantID) Reset() {
|
|
tid.AccountID = 0
|
|
tid.ProjectID = 0
|
|
}
|
|
|
|
// String returns human-readable representation of tid
|
|
func (tid *TenantID) String() string {
|
|
return fmt.Sprintf("{accountID=%d,projectID=%d}", tid.AccountID, tid.ProjectID)
|
|
}
|
|
|
|
// equal returns true if tid equals to a.
|
|
func (tid *TenantID) equal(a *TenantID) bool {
|
|
return tid.AccountID == a.AccountID && tid.ProjectID == a.ProjectID
|
|
}
|
|
|
|
// less returns true if tid is less than a.
|
|
func (tid *TenantID) less(a *TenantID) bool {
|
|
if tid.AccountID != a.AccountID {
|
|
return tid.AccountID < a.AccountID
|
|
}
|
|
return tid.ProjectID < a.ProjectID
|
|
}
|
|
|
|
// marshal appends the marshaled tid to dst and returns the result
|
|
func (tid *TenantID) marshal(dst []byte) []byte {
|
|
dst = encoding.MarshalUint32(dst, tid.AccountID)
|
|
dst = encoding.MarshalUint32(dst, tid.ProjectID)
|
|
return dst
|
|
}
|
|
|
|
// unmarshal unmarshals tid from src and returns the remaining tail.
|
|
func (tid *TenantID) unmarshal(src []byte) ([]byte, error) {
|
|
if len(src) < 8 {
|
|
return src, fmt.Errorf("cannot unmarshal tenantID from %d bytes; need at least 8 bytes", len(src))
|
|
}
|
|
tid.AccountID = encoding.UnmarshalUint32(src[:4])
|
|
tid.ProjectID = encoding.UnmarshalUint32(src[4:])
|
|
return src[8:], nil
|
|
}
|
|
|
|
// GetTenantIDFromRequest returns tenantID from r.
|
|
func GetTenantIDFromRequest(r *http.Request) (TenantID, error) {
|
|
var tenantID TenantID
|
|
|
|
accountID, err := getUint32FromHeader(r, "AccountID")
|
|
if err != nil {
|
|
return tenantID, err
|
|
}
|
|
projectID, err := getUint32FromHeader(r, "ProjectID")
|
|
if err != nil {
|
|
return tenantID, err
|
|
}
|
|
|
|
tenantID.AccountID = accountID
|
|
tenantID.ProjectID = projectID
|
|
return tenantID, nil
|
|
}
|
|
|
|
// GetTenantIDFromString returns tenantID from s.
|
|
// String is expected in the form of accountID:projectID
|
|
func GetTenantIDFromString(s string) (TenantID, error) {
|
|
var tenantID TenantID
|
|
colon := strings.Index(s, ":")
|
|
if colon < 0 {
|
|
account, err := getUint32FromString(s)
|
|
if err != nil {
|
|
return tenantID, fmt.Errorf("cannot parse %q as TenantID: %w", s, err)
|
|
}
|
|
tenantID.AccountID = account
|
|
|
|
return tenantID, nil
|
|
}
|
|
|
|
account, err := getUint32FromString(s[:colon])
|
|
if err != nil {
|
|
return tenantID, fmt.Errorf("cannot parse %q as TenantID: %w", s, err)
|
|
}
|
|
tenantID.AccountID = account
|
|
|
|
project, err := getUint32FromString(s[colon+1:])
|
|
if err != nil {
|
|
return tenantID, fmt.Errorf("cannot parse %q as TenantID: %w", s, err)
|
|
}
|
|
tenantID.ProjectID = project
|
|
|
|
return tenantID, nil
|
|
}
|
|
|
|
func getUint32FromHeader(r *http.Request, headerName string) (uint32, error) {
|
|
s := r.Header.Get(headerName)
|
|
if len(s) == 0 {
|
|
return 0, nil
|
|
}
|
|
return getUint32FromString(s)
|
|
}
|
|
|
|
func getUint32FromString(s string) (uint32, error) {
|
|
if len(s) == 0 {
|
|
return 0, nil
|
|
}
|
|
n, err := strconv.ParseUint(s, 10, 32)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot parse %q as uint32: %w", s, err)
|
|
}
|
|
return uint32(n), nil
|
|
}
|