event-gateway/internal/source/manager_test.go

189 lines
4.4 KiB
Go

// Copyright © 2023 Roberto Hidalgo <event-gateway@un.rob.mx>
// SPDX-License-Identifier: Apache-2.0
package source_test
import (
"bytes"
"net/http/httptest"
"syscall"
"testing"
"testing/fstest"
"time"
"git.rob.mx/nidito/event-gateway/internal/config"
"git.rob.mx/nidito/event-gateway/internal/source"
"git.rob.mx/nidito/event-gateway/internal/source/http"
"github.com/julienschmidt/httprouter"
"github.com/sirupsen/logrus"
)
func TestManagerLifecycle(t *testing.T) {
logrus.SetLevel(logrus.DebugLevel)
payload := bytes.NewBufferString(`{"test": "test"}`)
headers := map[string]string{
"content-type": "application/json",
}
router := httprouter.New()
src := &http.Source{Router: router}
manager := &source.Manager{}
manager.AddSource(src)
cfg, err := config.FromURN("file://config.json")
if err != nil {
t.Fatalf("could not initialize configuration loader: %s", err)
}
// config loads
t.Run("config loads", func(t *testing.T) {
config.FS = fstest.MapFS{
"config.json": &fstest.MapFile{
Data: []byte(`{
"event-id": {
"source": {
"kind": "http",
"path": "some-path"
},
"sink": {
"kind": "debug"
}
}
}`),
},
}
if err := manager.Listen(cfg); err != nil {
t.Fatalf("could not start listener: %s", err)
}
time.Sleep(10 * time.Millisecond)
res := testEvent("/-/some-path", payload, headers, router)
body := res.Body.String()
if body != "processed" {
t.Fatalf("unexpected response wanted ok, got: %s", body)
}
})
t.Run("config updates and deletes", func(t *testing.T) {
config.FS = &fstest.MapFS{
"config.json": &fstest.MapFile{
Data: []byte(`{}`),
},
}
syscall.Kill(syscall.Getpid(), syscall.SIGHUP)
time.Sleep(10 * time.Millisecond)
res := testEvent("/-/some-path", payload, headers, router)
body := res.Body.String()
if body != "ignored" {
t.Fatalf("unexpected response wanted ignored, got: %s", body)
}
})
t.Run("config updates and re-adds", func(t *testing.T) {
config.FS = &fstest.MapFS{
"config.json": &fstest.MapFile{
Data: []byte(`{
"event-id": {
"source": {
"kind": "http",
"path": "some-path"
},
"sink": {
"kind": "debug"
}
}
}`),
},
}
syscall.Kill(syscall.Getpid(), syscall.SIGHUP)
time.Sleep(10 * time.Millisecond)
res := testEvent("/-/some-path", payload, headers, router)
body := res.Body.String()
if body != "processed" {
t.Fatalf("unexpected response wanted processed, got: %s", body)
}
})
t.Run("config replaces", func(t *testing.T) {
config.FS = &fstest.MapFS{
"config.json": &fstest.MapFile{
Data: []byte(`{
"replaced": {
"source": {
"kind": "http",
"path": "some-other-path"
},
"sink": {
"kind": "debug"
}
}
}`),
},
}
syscall.Kill(syscall.Getpid(), syscall.SIGHUP)
time.Sleep(10 * time.Millisecond)
res := testEvent("/-/some-other-path", payload, headers, router)
body := res.Body.String()
if body != "processed" {
t.Fatalf("unexpected response wanted processed, got: %s", body)
}
})
t.Run("config updates listener with changes", func(t *testing.T) {
config.FS = &fstest.MapFS{
"config.json": &fstest.MapFile{
Data: []byte(`{
"replaced": {
"source": {
"kind": "http",
"path": "some-path"
},
"sink": {
"kind": "debug"
}
}
}`),
},
}
syscall.Kill(syscall.Getpid(), syscall.SIGHUP)
time.Sleep(10 * time.Millisecond)
res := testEvent("/-/some-path", payload, headers, router)
body := res.Body.String()
if body != "processed" {
t.Fatalf("unexpected response wanted processed, got: %s", body)
}
})
t.Run("config reloads without changes", func(t *testing.T) {
syscall.Kill(syscall.Getpid(), syscall.SIGHUP)
time.Sleep(10 * time.Millisecond)
res := testEvent("/-/some-path", payload, headers, router)
body := res.Body.String()
if body != "processed" {
t.Fatalf("unexpected response wanted processed, got: %s", body)
}
})
}
func testEvent(path string, payload *bytes.Buffer, headers map[string]string, router *httprouter.Router) *httptest.ResponseRecorder {
res := httptest.NewRecorder()
logrus.Debugf("Calling %s", path)
req := httptest.NewRequest("POST", path, payload)
for k, v := range headers {
req.Header.Add(k, v)
}
router.ServeHTTP(res, req)
return res
}