70 lines
1.4 KiB
Go
70 lines
1.4 KiB
Go
|
// Copyright © 2023 Roberto Hidalgo <event-gateway@un.rob.mx>
|
||
|
// SPDX-License-Identifier: Apache-2.0
|
||
|
package sink
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
|
||
|
"git.rob.mx/nidito/event-gateway/internal/sink/debug"
|
||
|
"git.rob.mx/nidito/event-gateway/internal/sink/nomad"
|
||
|
"git.rob.mx/nidito/event-gateway/internal/sink/types"
|
||
|
)
|
||
|
|
||
|
type registry map[types.Kind]types.Sink
|
||
|
|
||
|
var sinks = registry{}
|
||
|
|
||
|
func Clear() {
|
||
|
sinks = registry{}
|
||
|
}
|
||
|
|
||
|
func (r registry) ForKind(k types.Kind) (types.Sink, error) {
|
||
|
if _, ok := r[k]; !ok {
|
||
|
switch k {
|
||
|
case types.Nomad:
|
||
|
s, err := nomad.New()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
r[k] = s
|
||
|
case types.Debug:
|
||
|
s, err := debug.New()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
r[k] = s
|
||
|
}
|
||
|
}
|
||
|
return r[k], nil
|
||
|
}
|
||
|
|
||
|
func ForKind(k types.Kind) (types.Sink, error) {
|
||
|
return sinks.ForKind(k)
|
||
|
}
|
||
|
|
||
|
func Parse(config json.RawMessage) (types.Event, error) {
|
||
|
hc := &types.Config{Data: config}
|
||
|
if err := json.Unmarshal(config, &hc); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
sink, err := sinks.ForKind(hc.Kind)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return sink.Parse(hc)
|
||
|
}
|
||
|
|
||
|
func Dispatch(ctx context.Context, event types.Event) error {
|
||
|
sink, err := sinks.ForKind(event.Kind())
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
_, span := sink.Tracer().Start(ctx, fmt.Sprintf("dispatch.%s", event.Kind()))
|
||
|
defer span.End()
|
||
|
return sink.Dispatch(ctx, span, event)
|
||
|
}
|