// Copyright © 2023 Roberto Hidalgo // SPDX-License-Identifier: Apache-2.0 package source_test import ( "bytes" "net/http/httptest" "syscall" "testing" "testing/fstest" "time" "git.rob.mx/nidito/event-gateway/internal/config" "git.rob.mx/nidito/event-gateway/internal/source" "git.rob.mx/nidito/event-gateway/internal/source/http" "github.com/julienschmidt/httprouter" "github.com/sirupsen/logrus" ) func TestManagerLifecycle(t *testing.T) { logrus.SetLevel(logrus.DebugLevel) payload := bytes.NewBufferString(`{"test": "test"}`) headers := map[string]string{ "content-type": "application/json", } router := httprouter.New() src := &http.Source{Router: router} manager := &source.Manager{} manager.AddSource(src) cfg, err := config.FromURN("file://config.json") if err != nil { t.Fatalf("could not initialize configuration loader: %s", err) } // config loads t.Run("config loads", func(t *testing.T) { config.FS = fstest.MapFS{ "config.json": &fstest.MapFile{ Data: []byte(`{ "event-id": { "source": { "kind": "http", "path": "some-path" }, "sink": { "kind": "debug" } } }`), }, } if err := manager.Listen(cfg); err != nil { t.Fatalf("could not start listener: %s", err) } time.Sleep(10 * time.Millisecond) res := testEvent("/-/some-path", payload, headers, router) body := res.Body.String() if body != "processed" { t.Fatalf("unexpected response wanted ok, got: %s", body) } }) t.Run("config updates and deletes", func(t *testing.T) { config.FS = &fstest.MapFS{ "config.json": &fstest.MapFile{ Data: []byte(`{}`), }, } syscall.Kill(syscall.Getpid(), syscall.SIGHUP) time.Sleep(10 * time.Millisecond) res := testEvent("/-/some-path", payload, headers, router) body := res.Body.String() if body != "ignored" { t.Fatalf("unexpected response wanted ignored, got: %s", body) } }) t.Run("config updates and re-adds", func(t *testing.T) { config.FS = &fstest.MapFS{ "config.json": &fstest.MapFile{ Data: []byte(`{ "event-id": { "source": { "kind": "http", "path": "some-path" }, "sink": { "kind": "debug" } } }`), }, } syscall.Kill(syscall.Getpid(), syscall.SIGHUP) time.Sleep(10 * time.Millisecond) res := testEvent("/-/some-path", payload, headers, router) body := res.Body.String() if body != "processed" { t.Fatalf("unexpected response wanted processed, got: %s", body) } }) t.Run("config replaces", func(t *testing.T) { config.FS = &fstest.MapFS{ "config.json": &fstest.MapFile{ Data: []byte(`{ "replaced": { "source": { "kind": "http", "path": "some-other-path" }, "sink": { "kind": "debug" } } }`), }, } syscall.Kill(syscall.Getpid(), syscall.SIGHUP) time.Sleep(10 * time.Millisecond) res := testEvent("/-/some-other-path", payload, headers, router) body := res.Body.String() if body != "processed" { t.Fatalf("unexpected response wanted processed, got: %s", body) } }) t.Run("config updates listener with changes", func(t *testing.T) { config.FS = &fstest.MapFS{ "config.json": &fstest.MapFile{ Data: []byte(`{ "replaced": { "source": { "kind": "http", "path": "some-path" }, "sink": { "kind": "debug" } } }`), }, } syscall.Kill(syscall.Getpid(), syscall.SIGHUP) time.Sleep(10 * time.Millisecond) res := testEvent("/-/some-path", payload, headers, router) body := res.Body.String() if body != "processed" { t.Fatalf("unexpected response wanted processed, got: %s", body) } }) t.Run("config reloads without changes", func(t *testing.T) { syscall.Kill(syscall.Getpid(), syscall.SIGHUP) time.Sleep(10 * time.Millisecond) res := testEvent("/-/some-path", payload, headers, router) body := res.Body.String() if body != "processed" { t.Fatalf("unexpected response wanted processed, got: %s", body) } }) } func testEvent(path string, payload *bytes.Buffer, headers map[string]string, router *httprouter.Router) *httptest.ResponseRecorder { res := httptest.NewRecorder() logrus.Debugf("Calling %s", path) req := httptest.NewRequest("POST", path, payload) for k, v := range headers { req.Header.Add(k, v) } router.ServeHTTP(res, req) return res }