event-gateway/internal/sink/sink.go

70 lines
1.4 KiB
Go
Raw Permalink Normal View History

2024-04-20 20:31:06 +00:00
// Copyright © 2023 Roberto Hidalgo <event-gateway@un.rob.mx>
// SPDX-License-Identifier: Apache-2.0
package sink
import (
"context"
"encoding/json"
"fmt"
"git.rob.mx/nidito/event-gateway/internal/sink/debug"
"git.rob.mx/nidito/event-gateway/internal/sink/nomad"
"git.rob.mx/nidito/event-gateway/internal/sink/types"
)
type registry map[types.Kind]types.Sink
var sinks = registry{}
func Clear() {
sinks = registry{}
}
func (r registry) ForKind(k types.Kind) (types.Sink, error) {
if _, ok := r[k]; !ok {
switch k {
case types.Nomad:
s, err := nomad.New()
if err != nil {
return nil, err
}
r[k] = s
case types.Debug:
s, err := debug.New()
if err != nil {
return nil, err
}
r[k] = s
}
}
return r[k], nil
}
func ForKind(k types.Kind) (types.Sink, error) {
return sinks.ForKind(k)
}
func Parse(config json.RawMessage) (types.Event, error) {
hc := &types.Config{Data: config}
if err := json.Unmarshal(config, &hc); err != nil {
return nil, err
}
sink, err := sinks.ForKind(hc.Kind)
if err != nil {
return nil, err
}
return sink.Parse(hc)
}
func Dispatch(ctx context.Context, event types.Event) error {
sink, err := sinks.ForKind(event.Kind())
if err != nil {
return err
}
_, span := sink.Tracer().Start(ctx, fmt.Sprintf("dispatch.%s", event.Kind()))
defer span.End()
return sink.Dispatch(ctx, span, event)
}