// Copyright © 2023 Roberto Hidalgo // SPDX-License-Identifier: Apache-2.0 package config import ( // "fmt" // "os" // "os/signal" // "syscall" "context" "encoding/json" "fmt" "strings" "time" "git.rob.mx/nidito/chinampa/pkg/logger" "github.com/cenkalti/backoff" "github.com/hashicorp/consul/api" ) var clogger = logger.Sub("event-gateway.config.consul") type Consul struct { Prefix string client *api.Client lastIndex uint64 cancel context.CancelFunc registry chan []*Listener } func NewConsul(prefix string) (Loader, error) { cfg := api.DefaultConfig() client, err := api.NewClient(cfg) if err != nil { return nil, fmt.Errorf("could not initialize Consul configuration client loader: %s", err) } return &Consul{Prefix: prefix, client: client}, nil } var _ Loader = &Consul{} func (c *Consul) String() string { return fmt.Sprintf("ConsulLoader at %s", c.Prefix) } func (c *Consul) Load() (res []*Listener, err error) { opts := &api.QueryOptions{WaitIndex: c.lastIndex, WaitTime: 10 * time.Minute} clogger.Debugf("Querying kv at %s", c.Prefix) pairs, meta, err := c.client.KV().List(c.Prefix, opts) if err != nil { return res, fmt.Errorf("failed querying consul for config: %s", err) } clogger.Debugf("Querying ok: %+v, %d items found", meta.LastIndex, len(pairs)) c.lastIndex = meta.LastIndex for _, p := range pairs { l := &Listener{ID: strings.TrimPrefix(p.Key, c.Prefix+"/")} clogger.Tracef("decoding config for %s", l.ID) if err := json.Unmarshal(p.Value, l); err != nil { return res, fmt.Errorf("could not decode config for %s/%s:%s", c.Prefix, p.Key, err) } clogger.Tracef("decoded config for %s", l.ID) res = append(res, l) } return res, nil } func (c *Consul) fetch() error { clogger.Info("Watch expired, reloading consul configuration") res, err := c.Load() if err != nil { return err } clogger.Info("Reloaded consul configuration") c.registry <- res return nil } func (c *Consul) Watch(reg chan []*Listener) { clogger.Infof("Starting config watch for %s", c.Prefix) bgctx, cancel := context.WithCancel(context.Background()) c.cancel = cancel c.registry = reg ctx := backoff.WithContext(backoff.NewExponentialBackOff(), bgctx) onUpdateError := func(err error, cooldown time.Duration) { clogger.Errorf("%s. Retrying in %vs", err, cooldown.Truncate(time.Second)) } go func() { for { if err := backoff.RetryNotify(c.fetch, ctx, onUpdateError); err != nil { continue } } }() }