2021-11-01 14:59:38 +01:00
|
|
|
/*
|
|
|
|
*
|
|
|
|
* Copyright 2020 gRPC authors.
|
|
|
|
*
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
|
|
|
|
// Package weightedtarget implements the weighted_target balancer.
|
2021-12-20 11:06:52 +01:00
|
|
|
//
|
|
|
|
// All APIs in this package are experimental.
|
2021-11-01 14:59:38 +01:00
|
|
|
package weightedtarget
|
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
|
|
|
|
"google.golang.org/grpc/balancer"
|
2021-12-20 11:06:52 +01:00
|
|
|
"google.golang.org/grpc/balancer/weightedtarget/weightedaggregator"
|
|
|
|
"google.golang.org/grpc/internal/balancergroup"
|
2021-11-01 14:59:38 +01:00
|
|
|
"google.golang.org/grpc/internal/grpclog"
|
|
|
|
"google.golang.org/grpc/internal/hierarchy"
|
|
|
|
"google.golang.org/grpc/internal/pretty"
|
|
|
|
"google.golang.org/grpc/internal/wrr"
|
|
|
|
"google.golang.org/grpc/resolver"
|
|
|
|
"google.golang.org/grpc/serviceconfig"
|
|
|
|
)
|
|
|
|
|
|
|
|
// Name is the name of the weighted_target balancer.
|
|
|
|
const Name = "weighted_target_experimental"
|
|
|
|
|
|
|
|
// NewRandomWRR is the WRR constructor used to pick sub-pickers from
|
|
|
|
// sub-balancers. It's to be modified in tests.
|
|
|
|
var NewRandomWRR = wrr.NewRandom
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
balancer.Register(bb{})
|
|
|
|
}
|
|
|
|
|
|
|
|
type bb struct{}
|
|
|
|
|
|
|
|
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
|
|
|
|
b := &weightedTargetBalancer{}
|
|
|
|
b.logger = prefixLogger(b)
|
|
|
|
b.stateAggregator = weightedaggregator.New(cc, b.logger, NewRandomWRR)
|
|
|
|
b.stateAggregator.Start()
|
2021-12-20 11:06:52 +01:00
|
|
|
b.bg = balancergroup.New(cc, bOpts, b.stateAggregator, b.logger)
|
2021-11-01 14:59:38 +01:00
|
|
|
b.bg.Start()
|
|
|
|
b.logger.Infof("Created")
|
|
|
|
return b
|
|
|
|
}
|
|
|
|
|
|
|
|
func (bb) Name() string {
|
|
|
|
return Name
|
|
|
|
}
|
|
|
|
|
|
|
|
func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
|
|
|
|
return parseConfig(c)
|
|
|
|
}
|
|
|
|
|
|
|
|
type weightedTargetBalancer struct {
|
|
|
|
logger *grpclog.PrefixLogger
|
|
|
|
|
|
|
|
bg *balancergroup.BalancerGroup
|
|
|
|
stateAggregator *weightedaggregator.Aggregator
|
|
|
|
|
|
|
|
targets map[string]Target
|
|
|
|
}
|
|
|
|
|
|
|
|
// UpdateClientConnState takes the new targets in balancer group,
|
|
|
|
// creates/deletes sub-balancers and sends them update. addresses are split into
|
|
|
|
// groups based on hierarchy path.
|
|
|
|
func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
|
|
|
|
b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(s.BalancerConfig))
|
|
|
|
newConfig, ok := s.BalancerConfig.(*LBConfig)
|
|
|
|
if !ok {
|
|
|
|
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
|
|
|
|
}
|
|
|
|
addressesSplit := hierarchy.Group(s.ResolverState.Addresses)
|
|
|
|
|
|
|
|
var rebuildStateAndPicker bool
|
|
|
|
|
|
|
|
// Remove sub-pickers and sub-balancers that are not in the new config.
|
|
|
|
for name := range b.targets {
|
|
|
|
if _, ok := newConfig.Targets[name]; !ok {
|
|
|
|
b.stateAggregator.Remove(name)
|
|
|
|
b.bg.Remove(name)
|
|
|
|
// Trigger a state/picker update, because we don't want `ClientConn`
|
|
|
|
// to pick this sub-balancer anymore.
|
|
|
|
rebuildStateAndPicker = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// For sub-balancers in the new config
|
|
|
|
// - if it's new. add to balancer group,
|
|
|
|
// - if it's old, but has a new weight, update weight in balancer group.
|
|
|
|
//
|
|
|
|
// For all sub-balancers, forward the address/balancer config update.
|
|
|
|
for name, newT := range newConfig.Targets {
|
|
|
|
oldT, ok := b.targets[name]
|
|
|
|
if !ok {
|
|
|
|
// If this is a new sub-balancer, add weights to the picker map.
|
|
|
|
b.stateAggregator.Add(name, newT.Weight)
|
|
|
|
// Then add to the balancer group.
|
|
|
|
b.bg.Add(name, balancer.Get(newT.ChildPolicy.Name))
|
|
|
|
// Not trigger a state/picker update. Wait for the new sub-balancer
|
|
|
|
// to send its updates.
|
|
|
|
} else if newT.ChildPolicy.Name != oldT.ChildPolicy.Name {
|
|
|
|
// If the child policy name is differet, remove from balancer group
|
|
|
|
// and re-add.
|
|
|
|
b.stateAggregator.Remove(name)
|
|
|
|
b.bg.Remove(name)
|
|
|
|
b.stateAggregator.Add(name, newT.Weight)
|
|
|
|
b.bg.Add(name, balancer.Get(newT.ChildPolicy.Name))
|
|
|
|
// Trigger a state/picker update, because we don't want `ClientConn`
|
|
|
|
// to pick this sub-balancer anymore.
|
|
|
|
rebuildStateAndPicker = true
|
|
|
|
} else if newT.Weight != oldT.Weight {
|
|
|
|
// If this is an existing sub-balancer, update weight if necessary.
|
|
|
|
b.stateAggregator.UpdateWeight(name, newT.Weight)
|
|
|
|
// Trigger a state/picker update, because we don't want `ClientConn`
|
|
|
|
// should do picks with the new weights now.
|
|
|
|
rebuildStateAndPicker = true
|
|
|
|
}
|
|
|
|
|
|
|
|
// Forwards all the update:
|
|
|
|
// - addresses are from the map after splitting with hierarchy path,
|
|
|
|
// - Top level service config and attributes are the same,
|
|
|
|
// - Balancer config comes from the targets map.
|
|
|
|
//
|
|
|
|
// TODO: handle error? How to aggregate errors and return?
|
|
|
|
_ = b.bg.UpdateClientConnState(name, balancer.ClientConnState{
|
|
|
|
ResolverState: resolver.State{
|
|
|
|
Addresses: addressesSplit[name],
|
|
|
|
ServiceConfig: s.ResolverState.ServiceConfig,
|
|
|
|
Attributes: s.ResolverState.Attributes,
|
|
|
|
},
|
|
|
|
BalancerConfig: newT.ChildPolicy.Config,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
b.targets = newConfig.Targets
|
|
|
|
|
|
|
|
if rebuildStateAndPicker {
|
|
|
|
b.stateAggregator.BuildAndUpdate()
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *weightedTargetBalancer) ResolverError(err error) {
|
|
|
|
b.bg.ResolverError(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *weightedTargetBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
|
|
|
|
b.bg.UpdateSubConnState(sc, state)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *weightedTargetBalancer) Close() {
|
|
|
|
b.stateAggregator.Stop()
|
|
|
|
b.bg.Close()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *weightedTargetBalancer) ExitIdle() {
|
|
|
|
b.bg.ExitIdle()
|
|
|
|
}
|