event-gateway/internal/source/consul/watch.go

70 lines
1.7 KiB
Go
Raw Permalink Normal View History

2024-04-20 20:31:06 +00:00
// Copyright © 2023 Roberto Hidalgo <event-gateway@un.rob.mx>
// SPDX-License-Identifier: Apache-2.0
package consul
import (
"context"
"time"
"git.rob.mx/nidito/event-gateway/internal/payload"
"git.rob.mx/nidito/event-gateway/internal/sink"
"git.rob.mx/nidito/event-gateway/internal/sink/types"
"github.com/cenkalti/backoff/v4"
"github.com/hashicorp/consul/api"
)
type watch struct {
Name string
DC string `json:"dc"`
Filter *string `json:"filter,omitempty"`
Transform *string `json:"transform,omitempty"`
Event types.Event
lastIndex uint64
cancel context.CancelFunc
client Client
}
func (c *watch) Watch(client Client) {
log.Infof("Starting lookup for %s", c.Name)
bgctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
ctx := backoff.WithContext(backoff.NewExponentialBackOff(), bgctx)
onUpdateError := func(err error, cooldown time.Duration) {
log.Errorf("Could not lookup %s, retrying in %vs: %v", c.Name, cooldown.Truncate(time.Second), err)
}
c.client = client
go func() {
for {
if err := backoff.RetryNotify(c.fetch, ctx, onUpdateError); err != nil {
continue
}
}
}()
}
func (c *watch) fetch() error {
opts := &api.QueryOptions{
WaitTime: 10 * time.Minute,
WaitIndex: c.lastIndex,
}
evts, meta, err := c.client.List(c.Name, opts)
if err != nil {
return err
}
c.lastIndex = meta.LastIndex
for _, evt := range evts {
log.Infof("Incoming event from source: consul:%s", c.Name)
ctx := context.WithValue(context.Background(), payload.ContextKey, evt.Payload)
// ctx = context.WithValue(ctx, "event", "consul:"+c.Name)
if err := sink.Dispatch(ctx, c.Event); err != nil {
log.Errorf("could not trigger sink from consul:%s %s\n", c.Name, err)
}
}
return nil
}