Skip to content

Latest commit

 

History

History
 
 

第四章 使用配置中心

前面的章节中,我们把配置以yml文件的方式放到配置目录conf中,并通过go-config把其加载读到应用中。

Micro生态链中使用go-config管理配置,但是目前go-config定位是客户端插件,即是说它并没有充当配置中心的能力,不过,可以通过接口使用其它如etcd、etcd、k8s等具备kv存储能力的配置服务器。

go-config与配置服务器组合,足以俱备配置中心的能力。

go-config在Micro体系中工作层次如下图所示:

本章我们重点介绍如何使用gRPC作为配置中心,因为etcd、etcd、k8s使用方式大同小异,不过多赘述。

go-config所有使用方式,包括本地与中心服务可参考下列示例

  • Env 本地 基于环境变量
  • File 本地 基于配置文件
  • Flag 本地 基于命令行Flag参数文件
  • memory 本地 基于内存方式配置
  • microcli 本地 基于MicroCli参数配置
  • gRPC 使用gRPC服务作为配置中心
  • etcd 使用etcd服务作为配置中心
  • etcd 使用etcd服务作为配置中心
  • k8s 使用K8s服务作为配置中心

gRPC Server

应用与gRPC Server之间的交互如下

如图所示,Go-config与Config Server交互有下面几个重要点:

  • Load方法负责触发指定配置源的Read(Next)方法,并向配置中心Read接口请求配置数据
  • 同时,Load会初始化侦听器Watcher,它负责循环向数据中心的Watch接口请求最新配置,并判断是否有数据变动
  • Watcher判断有变动后,把变动通过Next方法告诉应用
客户端 go-config中间层 服务端
Read Read->Watch->Next Read
Watch Watch->Next Watch

注意,客户端Read逻辑和Watch逻辑虽然都有Watch->Next步骤,但是各自的逻辑是不一样的。它们由不同的loadWatch监管。

我们来看gRPC Config Server的核心代码

func main() {

	// 灾难恢复
	defer func() {
		if r := recover(); r != nil {
			log.Logf("[main] Recovered in f %v", r)
		}
	}()

	// 加载并侦听配置文件
	err := loadAndWatchConfigFile()
	if err != nil {
		log.Fatal(err)
	}

	// 新建grpc Server服务
	service := grpc2.NewServer()
	proto.RegisterSourceServer(service, new(Service))
	ts, err := net.Listen("tcp", ":9600")
	if err != nil {
		log.Fatal(err)
	}
	log.Logf("configServer started")

	// 启动
	err = service.Serve(ts)
	if err != nil {
		log.Fatal(err)
	}
}

func (s Service) Read(ctx context.Context, req *proto.ReadRequest) (rsp *proto.ReadResponse, err error) {

	appName := parsePath(req.Path)

	rsp = &proto.ReadResponse{
		ChangeSet: getConfig(appName),
	}
	return
}

func (s Service) Watch(req *proto.WatchRequest, server proto.Source_WatchServer) (err error) {

	appName := parsePath(req.Path)
	rsp := &proto.WatchResponse{
		ChangeSet: getConfig(appName),
	}
	if err = server.Send(rsp); err != nil {
		log.Logf("[Watch] 侦听处理异常,%s", err)
		return err
	}

	return
}

func loadAndWatchConfigFile() (err error) {

	// 加载每个应用的配置文件
	for _, app := range apps {
		if err := config.Load(file.NewSource(
			file.WithPath("./conf/" + app + ".yml"),
		)); err != nil {
			log.Fatalf("[loadAndWatchConfigFile] 加载应用配置文件 异常,%s", err)
			return err
		}
	}

	// 侦听文件变动
	watcher, err := config.Watch()
	if err != nil {
		log.Fatalf("[loadAndWatchConfigFile] 开始侦听应用配置文件变动 异常,%s", err)
		return err
	}

	go func() {
		for {
			v, err := watcher.Next()
			if err != nil {
				log.Fatalf("[loadAndWatchConfigFile] 侦听应用配置文件变动 异常, %s", err)
				return
			}

			log.Logf("[loadAndWatchConfigFile] 文件变动,%s", string(v.Bytes()))
		}
	}()

	return
}

首先,为了方便,我们把所有的配置都放到micro空间下,并写在一个配置文件micro.yml中。

先来看loadAndWatchConfigFile方法

main.go

func loadAndWatchConfigFile() (err error) {

	// 加载每个应用的配置文件
	for _, app := range apps {
		if err := config.Load(file.NewSource(
			file.WithPath("./conf/" + app + ".yml"),
		)); err != nil {
			return err
		}
	}

	// 侦听文件变动
	watcher, err := config.Watch()
	if err != nil {
		return err
	}

	go func() {
		for {
			v, err := watcher.Next()
			if err != nil {
				return
			}
		}
	}()

	return
}

loadAndWatchConfigFile方法把所有指定的配置文件加载到go-config中,然后通过go-configWatch来侦听文件变动。

如果文件有变动,config.get方法拿到的数据便会是最新的:

getConfig

func getConfig(appName string) *proto.ChangeSet {

	bytes := config.Get(appName).Bytes()

	log.Logf("[getConfig] appName:%s", appName)
	return &proto.ChangeSet{
		Data:      bytes,
		Checksum:  fmt.Sprintf("%x", md5.Sum(bytes)),
		Format:    "yml",
		Source:    "file",
		Timestamp: time.Now().Unix()}
}

proto.ChangeSet规范了配置返回格式:

  • Data 具体的配置比特码
  • Checksum 本次请求的数据签名,用于判断data是否传输完成
  • Format 格式
  • Source 配置源
  • Timestamp 返回服务端处理完成时的时间戳

proto.ChangeSetReadWatch返回:

func (s Service) Read(ctx context.Context, req *proto.ReadRequest) (rsp *proto.ReadResponse, err error) {

	appName := parsePath(req.Path)

	rsp = &proto.ReadResponse{
		ChangeSet: getConfig(appName),
	}
	return
}

func (s Service) Watch(req *proto.WatchRequest, server proto.Source_WatchServer) (err error) {

	appName := parsePath(req.Path)
	rsp := &proto.WatchResponse{
		ChangeSet: getConfig(appName),
	}
	if err = server.Send(rsp); err != nil {
		log.Logf("[Watch] 侦听处理异常,%s", err)
		return err
	}

	return
}

客户端通过Next方法与Watch交互,有变动时即更新本地配置。

client

有了服务端,我们还需要有客户端来调用和侦听配置。见代码:

config.go

package config

import (
	"fmt"
	"github.com/micro/go-micro/v2/config"
	"github.com/micro/go-micro/v2/util/log"
	"sync"
)

var (
	m      sync.RWMutex
	inited bool

	// 默认配置器
	c = &configurator{}
)

// Configurator 配置器
type Configurator interface {
	App(name string, config interface{}) (err error)
}

// configurator 配置器
type configurator struct {
	conf config.Config
}

func (c *configurator) App(name string, config interface{}) (err error) {

	v := c.conf.Get(name)
	if v != nil {
		err = v.Scan(config)
	} else {
		err = fmt.Errorf("[App] 配置不存在,err:%s", name)
	}

	return
}

// c 配置器
func C() Configurator {
	return c
}

func (c *configurator) init(ops Options) (err error) {
	m.Lock()
	defer m.Unlock()

	if inited {
		log.Logf("[init] 配置已经初始化过")
		return
	}

	c.conf = config.NewConfig()

	// 加载配置
	err = c.conf.Load(ops.Sources...)
	if err != nil {
		log.Fatal(err)
	}

	go func() {

		log.Logf("[init] 侦听配置变动 ...")

		// 开始侦听变动事件
		watcher, err := c.conf.Watch()
		if err != nil {
			log.Fatal(err)
		}

		for {
			v, err := watcher.Next()
			if err != nil {
				log.Fatal(err)
			}

			log.Logf("[init] 侦听配置变动: %v", string(v.Bytes()))
		}
	}()

	// 标记已经初始化
	inited = true
	return
}

// Init 初始化配置
func Init(opts ...Option) {

	ops := Options{}
	for _, o := range opts {
		o(&ops)
	}

	c = &configurator{}

	c.init(ops)
}

客户端有两个部分组成

  • Configurator 配置器 负责提供给客户端获取配置
  • Init/init 初始化接口与内部方法 负责初始化并同步配置服务器回传的配置变动

我们实现的客户端并不限制数据源是gRPC或者像etcd这样的资源服务都可以直接调用Init进行初始化。

应用代码

我们把所有应用的conf目录全部删掉,都改成从配置中心读取配置,但是实际情况中我们会通常会在应用中保留必要的配置文件及配置属性,但是这里我们不考虑太多。

main中主要结构:

var (
	appName = "user_web"
	cfg     = &userCfg{}
)

type userCfg struct {
	common.AppCfg
}

func main() {

	// 初始化配置
	initCfg()

	// 使用etcd注册
	// 创建新服务
	service := web.NewService(
		web.Name(cfg.Name),
		web.Version(cfg.Version),
		web.Registry(micReg),
		web.Address(cfg.Addr()),
	)

	// 初始化服务
	// ...
	// 运行服务
}

func registryOptions(ops *registry.Options) {

	etcdCfg := &common.Etcd{}
	err := config.C().App("etcd", etcdCfg)
	if err != nil {
		panic(err)
	}

	ops.Timeout = time.Second * 5
	ops.Addrs = []string{fmt.Sprintf("%s:%d", etcdCfg.Host, etcdCfg.Port)}
}

func initCfg() {

	source := grpc.NewSource(
		grpc.WithAddress("127.0.0.1:9600"),
		grpc.WithPath("micro"),
	)

	basic.Init(config.WithSource(source))

	err := config.C().App(appName, cfg)
	if err != nil {
		panic(err)
	}

	log.Infof("[initCfg] 配置,cfg:%v", cfg)

	return
}

main第一步执行初始化方法initCfg,在其中声明使用grpc配置源,并通过config.C().App初始化应用的定制配置。

改动的地方都比较简单,初始化等没有进行封装,这是为了让大家容易理解,在此不一一讲解。

总结

本篇,引入了配置中心,采用gRPC Server作为配置中心服务,并把所有应用的配置都放到配置中心统一管理。

不足点有:

  • gRPC Server并没有实现高可用与负载均衡,不过这超过了本篇的讨论范畴,我们会有专门的教程来实现。

参考阅读

系列文章