mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
app/{vminsert,vmselect}: limit the access to storageNodes to getStorageNodesBucket and setStorageNodesBucket functions
This makes the code more maintainable and earier to test.
This commit is contained in:
parent
1386b6bd69
commit
976bbe3677
@ -459,12 +459,6 @@ type storageNode struct {
|
||||
sendDurationSeconds *metrics.FloatCounter
|
||||
}
|
||||
|
||||
func getStorageNodes() []*storageNode {
|
||||
v := storageNodes.Load()
|
||||
snb := v.(*storageNodesBucket)
|
||||
return snb.sns
|
||||
}
|
||||
|
||||
type storageNodesBucket struct {
|
||||
ms *metrics.Set
|
||||
sns []*storageNode
|
||||
@ -475,6 +469,19 @@ type storageNodesBucket struct {
|
||||
// storageNodes contains a list of vmstorage node clients.
|
||||
var storageNodes atomic.Value
|
||||
|
||||
func getStorageNodesBucket() *storageNodesBucket {
|
||||
return storageNodes.Load().(*storageNodesBucket)
|
||||
}
|
||||
|
||||
func setStorageNodesBucket(snb *storageNodesBucket) {
|
||||
storageNodes.Store(snb)
|
||||
}
|
||||
|
||||
func getStorageNodes() []*storageNode {
|
||||
snb := getStorageNodesBucket()
|
||||
return snb.sns
|
||||
}
|
||||
|
||||
// nodesHash is used for consistently selecting a storage node by key.
|
||||
var nodesHash *consistentHash
|
||||
|
||||
@ -484,6 +491,17 @@ var nodesHash *consistentHash
|
||||
//
|
||||
// Call MustStop when the initialized vmstorage connections are no longer needed.
|
||||
func Init(addrs []string, hashSeed uint64) {
|
||||
snb := initStorageNodes(addrs, hashSeed)
|
||||
setStorageNodesBucket(snb)
|
||||
}
|
||||
|
||||
// MustStop stops netstorage.
|
||||
func MustStop() {
|
||||
snb := getStorageNodesBucket()
|
||||
mustStopStorageNodes(snb)
|
||||
}
|
||||
|
||||
func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
|
||||
if len(addrs) == 0 {
|
||||
logger.Panicf("BUG: addrs must be non-empty")
|
||||
}
|
||||
@ -551,20 +569,15 @@ func Init(addrs []string, hashSeed uint64) {
|
||||
}
|
||||
|
||||
metrics.RegisterSet(ms)
|
||||
storageNodes.Store(&storageNodesBucket{
|
||||
return &storageNodesBucket{
|
||||
ms: ms,
|
||||
sns: sns,
|
||||
stopCh: stopCh,
|
||||
wg: &wg,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// MustStop stops netstorage.
|
||||
func MustStop() {
|
||||
v := storageNodes.Load()
|
||||
snb := v.(*storageNodesBucket)
|
||||
storageNodes.Store(&storageNodesBucket{})
|
||||
|
||||
func mustStopStorageNodes(snb *storageNodesBucket) {
|
||||
close(snb.stopCh)
|
||||
for _, sn := range snb.sns {
|
||||
sn.brCond.Broadcast()
|
||||
|
@ -2367,12 +2367,6 @@ func readUint64(bc *handshake.BufferedConn) (uint64, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func getStorageNodes() []*storageNode {
|
||||
v := storageNodes.Load()
|
||||
snb := v.(*storageNodesBucket)
|
||||
return snb.sns
|
||||
}
|
||||
|
||||
type storageNodesBucket struct {
|
||||
ms *metrics.Set
|
||||
sns []*storageNode
|
||||
@ -2380,10 +2374,34 @@ type storageNodesBucket struct {
|
||||
|
||||
var storageNodes atomic.Value
|
||||
|
||||
func getStorageNodesBucket() *storageNodesBucket {
|
||||
return storageNodes.Load().(*storageNodesBucket)
|
||||
}
|
||||
|
||||
func setStorageNodesBucket(snb *storageNodesBucket) {
|
||||
storageNodes.Store(snb)
|
||||
}
|
||||
|
||||
func getStorageNodes() []*storageNode {
|
||||
snb := getStorageNodesBucket()
|
||||
return snb.sns
|
||||
}
|
||||
|
||||
// Init initializes storage nodes' connections to the given addrs.
|
||||
//
|
||||
// MustStop must be called when the initialized connections are no longer needed.
|
||||
func Init(addrs []string) {
|
||||
snb := initStorageNodes(addrs)
|
||||
setStorageNodesBucket(snb)
|
||||
}
|
||||
|
||||
// MustStop gracefully stops netstorage.
|
||||
func MustStop() {
|
||||
snb := getStorageNodesBucket()
|
||||
mustStopStorageNodes(snb)
|
||||
}
|
||||
|
||||
func initStorageNodes(addrs []string) *storageNodesBucket {
|
||||
if len(addrs) == 0 {
|
||||
logger.Panicf("BUG: addrs must be non-empty")
|
||||
}
|
||||
@ -2425,16 +2443,13 @@ func Init(addrs []string) {
|
||||
sns = append(sns, sn)
|
||||
}
|
||||
metrics.RegisterSet(ms)
|
||||
storageNodes.Store(&storageNodesBucket{
|
||||
return &storageNodesBucket{
|
||||
sns: sns,
|
||||
ms: ms,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// MustStop gracefully stops netstorage.
|
||||
func MustStop() {
|
||||
snb := storageNodes.Load().(*storageNodesBucket)
|
||||
storageNodes.Store(&storageNodesBucket{})
|
||||
func mustStopStorageNodes(snb *storageNodesBucket) {
|
||||
for _, sn := range snb.sns {
|
||||
sn.connPool.MustStop()
|
||||
}
|
||||
|
@ -37,13 +37,15 @@ func TestTmpBlocksFileConcurrent(t *testing.T) {
|
||||
ch <- testTmpBlocksFile()
|
||||
}()
|
||||
}
|
||||
timer := time.NewTimer(30 * time.Second)
|
||||
defer timer.Stop()
|
||||
for i := 0; i < concurrency; i++ {
|
||||
select {
|
||||
case err := <-ch:
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
case <-time.After(30 * time.Second):
|
||||
case <-timer.C:
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user