Skip to content

Commit

Permalink
Merge pull request #47 from wenxuwan/master
Browse files Browse the repository at this point in the history
Exposure function of etcd
  • Loading branch information
AlexStocks authored Mar 10, 2021
2 parents fd94fb8 + bc1e1e2 commit 9f1418d
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 29 deletions.
48 changes: 30 additions & 18 deletions database/kv/etcd/v3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ var (
// NewConfigClient create new Client
func NewConfigClient(opts ...Option) *Client {
options := &Options{
heartbeat: 1, // default heartbeat
Heartbeat: 1, // default Heartbeat
}
for _, opt := range opts {
opt(options)
}

newClient, err := NewClient(options.name, options.endpoints, options.timeout, options.heartbeat)
newClient, err := NewClient(options.Name, options.Endpoints, options.Timeout, options.Heartbeat)
if err != nil {
log.Printf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}",
options.name, options.endpoints, options.timeout, err)
log.Printf("new etcd client (Name{%s}, etcd addresses{%v}, Timeout{%d}) = error{%v}",
options.Name, options.Endpoints, options.Timeout, err)
}
return newClient
}
Expand Down Expand Up @@ -136,6 +136,11 @@ func (c *Client) stop() bool {
}
}

// GetCtx return client context
func (c *Client) GetCtx() context.Context {
return c.ctx
}

// Close close client
func (c *Client) Close() {
if c == nil {
Expand All @@ -155,7 +160,7 @@ func (c *Client) Close() {
if c.rawClient != nil {
c.clean()
}
log.Printf("etcd client{name:%s, endpoints:%s} exit now.", c.name, c.endpoints)
log.Printf("etcd client{Name:%s, Endpoints:%s} exit now.", c.name, c.endpoints)
}

func (c *Client) keepSession() error {
Expand All @@ -173,7 +178,7 @@ func (c *Client) keepSession() error {
func (c *Client) keepSessionLoop(s *concurrency.Session) {
defer func() {
c.Wait.Done()
log.Printf("etcd client {endpoints:%v, name:%s} keep goroutine game over.", c.endpoints, c.name)
log.Printf("etcd client {Endpoints:%v, Name:%s} keep goroutine game over.", c.endpoints, c.name)
}()

for {
Expand All @@ -194,16 +199,22 @@ func (c *Client) keepSessionLoop(s *concurrency.Session) {
}
}

func (c *Client) getRawClient() *clientv3.Client {
//GetRawClient return etcd raw client
func (c *Client) GetRawClient() *clientv3.Client {
c.lock.RLock()
defer c.lock.RUnlock()

return c.rawClient
}

//GetEndPoints return etcd endpoints
func (c *Client) GetEndPoints() []string {
return c.endpoints
}

// if k not exist will put k/v in etcd, otherwise return nil
func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error {
rawClient := c.getRawClient()
rawClient := c.GetRawClient()

if rawClient == nil {
return ErrNilETCDV3Client
Expand All @@ -219,7 +230,7 @@ func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error {
// if k not exist will put k/v in etcd
// if k is already exist in etcd, replace it
func (c *Client) update(k string, v string, opts ...clientv3.OpOption) error {
rawClient := c.getRawClient()
rawClient := c.GetRawClient()

if rawClient == nil {
return ErrNilETCDV3Client
Expand All @@ -233,7 +244,7 @@ func (c *Client) update(k string, v string, opts ...clientv3.OpOption) error {
}

func (c *Client) delete(k string) error {
rawClient := c.getRawClient()
rawClient := c.GetRawClient()

if rawClient == nil {
return ErrNilETCDV3Client
Expand All @@ -244,7 +255,7 @@ func (c *Client) delete(k string) error {
}

func (c *Client) get(k string) (string, error) {
rawClient := c.getRawClient()
rawClient := c.GetRawClient()

if rawClient == nil {
return "", ErrNilETCDV3Client
Expand All @@ -264,7 +275,7 @@ func (c *Client) get(k string) (string, error) {

// CleanKV delete all key and value
func (c *Client) CleanKV() error {
rawClient := c.getRawClient()
rawClient := c.GetRawClient()

if rawClient == nil {
return ErrNilETCDV3Client
Expand All @@ -274,8 +285,9 @@ func (c *Client) CleanKV() error {
return err
}

func (c *Client) getChildren(k string) ([]string, []string, error) {
rawClient := c.getRawClient()
//GetChildren return node children
func (c *Client) GetChildren(k string) ([]string, []string, error) {
rawClient := c.GetRawClient()

if rawClient == nil {
return nil, nil, ErrNilETCDV3Client
Expand All @@ -300,7 +312,7 @@ func (c *Client) getChildren(k string) ([]string, []string, error) {
}

func (c *Client) watchWithPrefix(prefix string) (clientv3.WatchChan, error) {
rawClient := c.getRawClient()
rawClient := c.GetRawClient()

if rawClient == nil {
return nil, ErrNilETCDV3Client
Expand All @@ -310,7 +322,7 @@ func (c *Client) watchWithPrefix(prefix string) (clientv3.WatchChan, error) {
}

func (c *Client) watch(k string) (clientv3.WatchChan, error) {
rawClient := c.getRawClient()
rawClient := c.GetRawClient()

if rawClient == nil {
return nil, ErrNilETCDV3Client
Expand All @@ -320,7 +332,7 @@ func (c *Client) watch(k string) (clientv3.WatchChan, error) {
}

func (c *Client) keepAliveKV(k string, v string) error {
rawClient := c.getRawClient()
rawClient := c.GetRawClient()

if rawClient == nil {
return ErrNilETCDV3Client
Expand Down Expand Up @@ -389,7 +401,7 @@ func (c *Client) RegisterTemp(k, v string) error {

// GetChildrenKVList gets children kv list by @k
func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) {
kList, vList, err := c.getChildren(k)
kList, vList, err := c.GetChildren(k)
return kList, vList, perrors.WithMessagef(err, "get key children (key %s)", k)
}

Expand Down
27 changes: 16 additions & 11 deletions database/kv/etcd/v3/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,24 @@ const (
ConnDelay = 3
// MaxFailTimes max failure times
MaxFailTimes = 15
// RegistryETCDV3Client client name
// RegistryETCDV3Client client Name
RegistryETCDV3Client = "etcd registry"
// MetadataETCDV3Client client name
// MetadataETCDV3Client client Name
MetadataETCDV3Client = "etcd metadata"
)

// Options client configuration
type Options struct {
name string
endpoints []string
client *Client
timeout time.Duration
heartbeat int // heartbeat second
//Name etcd server name
Name string
//Endpoints etcd endpoints
Endpoints []string
//Client etcd client
Client *Client
//Timeout timeout
Timeout time.Duration
//Heartbeat second
Heartbeat int
}

// Option will define a function of handling Options
Expand All @@ -47,27 +52,27 @@ type Option func(*Options)
// WithEndpoints sets etcd client endpoints
func WithEndpoints(endpoints ...string) Option {
return func(opt *Options) {
opt.endpoints = endpoints
opt.Endpoints = endpoints
}
}

// WithName sets etcd client name
func WithName(name string) Option {
return func(opt *Options) {
opt.name = name
opt.Name = name
}
}

// WithTimeout sets etcd client timeout
func WithTimeout(timeout time.Duration) Option {
return func(opt *Options) {
opt.timeout = timeout
opt.Timeout = timeout
}
}

// WithHeartbeat sets etcd client heartbeat
func WithHeartbeat(heartbeat int) Option {
return func(opt *Options) {
opt.heartbeat = heartbeat
opt.Heartbeat = heartbeat
}
}

0 comments on commit 9f1418d

Please sign in to comment.