Viper on Etcd
Thu Oct 10, 2019 · 241 words

背景

希望在Go项目中使用Viper来读取Etcd中的配置文件并监听其变化

实现代码

package config

import (
        "github.com/spf13/viper"
        _ "github.com/spf13/viper/remote"
)

const (
        ProviderType     = "etcd"
        ProviderEndpoint = "http://192.168.1.63:2379"
        Path             = "/config.json"
)

func Setup() *viper.Viper {
        c := viper.New()
        c.SetConfigType("json")
        if e := c.AddRemoteProvider(ProviderType, ProviderEndpoint, Path); e != nil {
                panic(e)
        }
        if e := c.ReadRemoteConfig(); e != nil {
                panic(e)
        }
        if e := c.WatchRemoteConfigOnChannel(); e != nil {
                panic(e)
        }
        return c
}

Viper是如何实现监听其变化的?

通过查看源码发现 在代码$GOPATH/go/src/github.com/xordataexchange/crypt/backend/etcd/etcd.go:82附近

// ... 省略其他代码

for {
    var resp *goetcd.Response
    var err error
    // if c.waitIndex == 0 {
    // 	resp, err = c.client.Get(key, false, false)
    // 	if err != nil {
    // 		respChan <- &backend.Response{nil, err}
    // 		time.Sleep(time.Second * 5)
    // 		continue
    // 	}
    // 	c.waitIndex = resp.EtcdIndex
    // 	respChan <- &backend.Response{[]byte(resp.Node.Value), nil}
    // }
    // resp, err = c.client.Watch(key, c.waitIndex+1, false, nil, stop)
    resp, err = watcher.Next(ctx)
    if err != nil {
        respChan <- &backend.Response{nil, err}
        time.Sleep(time.Second * 5)
        continue
    }
    c.waitIndex = resp.Node.ModifiedIndex
    respChan <- &backend.Response{[]byte(resp.Node.Value), nil}
}

// ... 省略其他代码

是通过 Watcher.Next(ctx)来实现的, 而这其中的代码

func (hw *httpWatcher) Next(ctx context.Context) (*Response, error) {
	for {
		httpresp, body, err := hw.client.Do(ctx, &hw.nextWait)
		if err != nil {
			return nil, err
		}

		resp, err := unmarshalHTTPResponse(httpresp.StatusCode, httpresp.Header, body)
		if err != nil {
			if err == ErrEmptyBody {
				continue
			}
			return nil, err
		}

		hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1
		return resp, nil
	}
}

在这里通过不断的发送http请求到etcd服务端来查询如果得到服务器的非异常响应则表示键值有变化,之后就会把response中的结果(即键值(文件内容))的字节给反序列化到viper.kvstoremore

问题

轮询?

Q: 通过一个死循环去发送http请求轮询那CPU资源占用岂不是会很恐怖??

A: 安啦, etcd-server watch功能采用的long polling设计, 因此发出的http请求会被阻塞住,直到server端有返回或者超时异常(在viper中会睡眠5s然后又重试咯); 这个死循环中大部分时间都会阻塞在Socket-IO上不会占用很多的CPU。但是因为会阻塞,因此不能在业务线程中来Watch噢,而是应该在另外的线程中来处理,如果得到响应再通知业务线程更新配置。

REFERENCE:

back