event-gateway/internal/source/manager.go

112 lines
2.7 KiB
Go
Raw Normal View History

2024-04-20 20:31:06 +00:00
// Copyright © 2023 Roberto Hidalgo <event-gateway@un.rob.mx>
// SPDX-License-Identifier: Apache-2.0
package source
import (
"fmt"
"git.rob.mx/nidito/chinampa/pkg/logger"
"git.rob.mx/nidito/event-gateway/internal/config"
"git.rob.mx/nidito/event-gateway/internal/source/types"
)
var log = logger.Sub("event-gateway.manager")
type Manager struct {
Sources map[types.Kind]config.Source
Config config.Loader
Ready bool
listeners map[string]*config.Listener
}
func (m *Manager) AddSource(src config.Source) {
if m.Sources == nil {
m.Sources = map[types.Kind]config.Source{}
}
m.Sources[src.Kind()] = src
}
func (m *Manager) Listen(cfg config.Loader) error {
m.Config = cfg
for _, src := range m.Sources {
src.Initialize()
}
if err := m.watchConfig(); err != nil {
return fmt.Errorf("could not initialize config loading: %s", err)
}
return nil
}
func (m *Manager) watchConfig() error {
updates := make(chan []*config.Listener, 1)
go func() {
for {
listeners := <-updates
log.Debugf("manager got updated listeners with count %d", len(listeners))
if err := m.onUpdate(listeners); err != nil {
log.Errorf("could not process updated listeners: %s", err)
}
}
}()
log.Debugf("Loading configuration with %s", m.Config)
res, err := m.Config.Load()
if err != nil {
return fmt.Errorf("error loading config using %s: %s", m.Config, err)
}
log.Debugf("Loaded configuration with %s, starting watch", m.Config)
updates <- res
m.Config.Watch(updates)
return nil
}
func (m *Manager) onUpdate(listeners []*config.Listener) error {
updated := map[string]*config.Listener{}
for _, l := range listeners {
src, srcExists := m.Sources[l.Source.Kind]
if !srcExists {
return fmt.Errorf("unknown source kind <%s> for %s: %+v", l.Source.Kind, l.ID, l)
}
if l.Event == nil {
return fmt.Errorf("sink configuration for %s not provided: %+v", l.ID, l)
}
action := "registered"
if existing, exists := m.listeners[l.ID]; exists {
if l.Hash == existing.Hash {
log.Infof("Same configuration for %s, will not reload", l.ID)
updated[l.ID] = l
continue
}
log.Infof("Reloading configuration for %s", l.ID)
src.Deregister(l.ID)
action = "reloaded"
} else {
log.Debugf("registering %s with source %s", l.ID, l.Source.Kind)
}
if err := src.Register(l); err != nil {
log.Errorf("could not register %s with source %s: %s", l.Source.Kind, l.ID, err)
}
log.Infof("%s %s in source %s", action, l.ID, l.Source.Kind)
updated[l.ID] = l
}
for id, l := range m.listeners {
if _, exists := updated[id]; !exists {
log.Infof("Deleting listener for %s as it's no longer in configuration", l.ID)
m.Sources[l.Source.Kind].Deregister(id)
}
}
m.listeners = updated
m.Ready = true
return nil
}