event-gateway/internal/sink/nomad/nomad.go

119 lines
2.4 KiB
Go

// Copyright © 2023 Roberto Hidalgo <event-gateway@un.rob.mx>
// SPDX-License-Identifier: Apache-2.0
package nomad
import (
"context"
"encoding/json"
"fmt"
"git.rob.mx/nidito/chinampa/pkg/logger"
"git.rob.mx/nidito/event-gateway/internal/payload"
"git.rob.mx/nidito/event-gateway/internal/sink/types"
"github.com/hashicorp/nomad/api"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
var log = logger.Sub("event-gateway.sink.nomad")
type Job struct {
Job string `json:"job"`
Namespace string `json:"namespace"`
Region string `json:"region"`
Payload bool `json:"payload"`
}
var _ types.Event = &Job{}
func (j *Job) Kind() types.Kind {
return types.Nomad
}
type Sink struct {
Client *api.Client
tracer trace.Tracer
}
var _ types.Sink = &Sink{}
func New() (types.Sink, error) {
cfg := api.DefaultConfig()
c, err := api.NewClient(cfg)
if err != nil {
return nil, err
}
return &Sink{
Client: c,
tracer: otel.Tracer("event-gateway.sink.nomad"),
}, nil
}
func (s *Sink) Parse(c *types.Config) (types.Event, error) {
target := &Job{}
if err := json.Unmarshal(c.Data, &target); err != nil {
return nil, fmt.Errorf("failed to parse nomad handler: %s", err)
}
return target, nil
}
func (s *Sink) Tracer() trace.Tracer {
return s.tracer
}
func (s *Sink) Dispatch(ctx context.Context, span trace.Span, event types.Event) error {
j := event.(*Job)
span.SetAttributes(
attribute.KeyValue{
Key: "job",
Value: attribute.StringValue(j.Job),
},
attribute.KeyValue{
Key: "namespace",
Value: attribute.StringValue(j.Namespace),
},
)
defer span.End()
var p []byte
if j.Payload {
data, err := payload.FromContext(ctx)
if err != nil {
return err
}
serialized, err := data.Encoded()
if err != nil {
return err
}
p = serialized
}
opts := &api.WriteOptions{}
if j.Namespace != "" {
opts.Namespace = j.Namespace
}
if j.Region != "" {
opts.Region = j.Region
}
meta := map[string]string{}
log.Infof("dispatching nomad job %s", j.Job)
span.AddEvent("nomad.call")
res, _, err := s.Client.Jobs().Dispatch(j.Job, meta, p, "", opts)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "nomad.dispatch.fail")
return fmt.Errorf("could not dispatch nomad job: %s", err)
}
span.SetStatus(codes.Ok, "")
log.Infof("nomad job dispatched: %s (%s)", j.Job, res.DispatchedJobID)
return nil
}