// Copyright © 2023 Roberto Hidalgo // SPDX-License-Identifier: Apache-2.0 package nomad import ( "context" "encoding/json" "fmt" "git.rob.mx/nidito/chinampa/pkg/logger" "git.rob.mx/nidito/event-gateway/internal/payload" "git.rob.mx/nidito/event-gateway/internal/sink/types" "github.com/hashicorp/nomad/api" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" ) var log = logger.Sub("event-gateway.sink.nomad") type Job struct { Job string `json:"job"` Namespace string `json:"namespace"` Region string `json:"region"` Payload bool `json:"payload"` } var _ types.Event = &Job{} func (j *Job) Kind() types.Kind { return types.Nomad } type Sink struct { Client *api.Client tracer trace.Tracer } var _ types.Sink = &Sink{} func New() (types.Sink, error) { cfg := api.DefaultConfig() c, err := api.NewClient(cfg) if err != nil { return nil, err } return &Sink{ Client: c, tracer: otel.Tracer("event-gateway.sink.nomad"), }, nil } func (s *Sink) Parse(c *types.Config) (types.Event, error) { target := &Job{} if err := json.Unmarshal(c.Data, &target); err != nil { return nil, fmt.Errorf("failed to parse nomad handler: %s", err) } return target, nil } func (s *Sink) Tracer() trace.Tracer { return s.tracer } func (s *Sink) Dispatch(ctx context.Context, span trace.Span, event types.Event) error { j := event.(*Job) span.SetAttributes( attribute.KeyValue{ Key: "job", Value: attribute.StringValue(j.Job), }, attribute.KeyValue{ Key: "namespace", Value: attribute.StringValue(j.Namespace), }, ) defer span.End() var p []byte if j.Payload { data, err := payload.FromContext(ctx) if err != nil { return err } serialized, err := data.Encoded() if err != nil { return err } p = serialized } opts := &api.WriteOptions{} if j.Namespace != "" { opts.Namespace = j.Namespace } if j.Region != "" { opts.Region = j.Region } meta := map[string]string{} log.Infof("dispatching nomad job %s", j.Job) span.AddEvent("nomad.call") res, _, err := s.Client.Jobs().Dispatch(j.Job, meta, p, "", opts) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, "nomad.dispatch.fail") return fmt.Errorf("could not dispatch nomad job: %s", err) } span.SetStatus(codes.Ok, "") log.Infof("nomad job dispatched: %s (%s)", j.Job, res.DispatchedJobID) return nil }