diff options
author | Elizabeth Hunt <elizabeth.hunt@simponic.xyz> | 2025-01-05 16:36:51 -0800 |
---|---|---|
committer | Elizabeth Hunt <elizabeth.hunt@simponic.xyz> | 2025-01-11 11:58:34 -0800 |
commit | 040994898b109b3f344b37d1d449eb3b8f58ec53 (patch) | |
tree | 04758f61df0034a398ba133a3b29e7feecf23b85 /template/ntfy/watcher.go | |
parent | 687caaa787f9114e390ef34cd06b0c0658cdeae2 (diff) | |
download | oldinfra-040994898b109b3f344b37d1d449eb3b8f58ec53.tar.gz oldinfra-040994898b109b3f344b37d1d449eb3b8f58ec53.zip |
add ntfy integration
Diffstat (limited to 'template/ntfy/watcher.go')
-rw-r--r-- | template/ntfy/watcher.go | 96 |
1 files changed, 96 insertions, 0 deletions
diff --git a/template/ntfy/watcher.go b/template/ntfy/watcher.go new file mode 100644 index 0000000..d8959cd --- /dev/null +++ b/template/ntfy/watcher.go @@ -0,0 +1,96 @@ +package ntfy + +import ( + "bufio" + "encoding/json" + "log" + "net/http" + "net/url" + "path" + "time" +) + +type Message struct { + Id string `json:"id"` + Time int `json:"time"` + Message string `json:"message"` + Event string `json:"event"` +} + +type NtfyWatcher struct { + Endpoint string + Topics []string +} + +func (w *NtfyWatcher) Watch() chan Message { + notifications := make(chan Message) + + for _, topic := range w.Topics { + log.Println("subscribing to topic:", topic) + + go func() { + retryCount := 5 + retryTime := 5 * time.Second + retries := retryCount + + sleepAndDecrementRetry := func() { + log.Println("waiting 5 seconds before reconnecting. retries left:", retries, "topic:", topic, "endpoint:", w.Endpoint) + time.Sleep(retryTime) + retries-- + } + + for true { + if retries == 0 { + log.Fatal("too many retries, exiting") + } + + endpoint, _ := url.JoinPath(w.Endpoint, path.Join(topic, "json")) + resp, err := http.Get(endpoint) + if err != nil { + log.Println("error connecting to endpoint:", err) + sleepAndDecrementRetry() + continue + } + + defer resp.Body.Close() + scanner := bufio.NewScanner(resp.Body) + for scanner.Scan() { + bytes := scanner.Bytes() + var msg Message + err := json.Unmarshal(bytes, &msg) + if err != nil { + log.Println("could not unmarshal message:", err) + continue + } + + if msg.Event == "keepalive" { + log.Println("received keepalive message") + continue + } + if msg.Event != "message" { + log.Println("received unknown event:", msg.Event) + continue + } + + log.Println("received notification:", msg) + notifications <- msg + retries = retryCount // reset retries + } + + if err := scanner.Err(); err != nil { + log.Println("error reading response body:", err) + sleepAndDecrementRetry() + } + } + }() + } + + return notifications +} + +func MakeNtfyWatcher(endpoint string, topics []string) *NtfyWatcher { + return &NtfyWatcher{ + Endpoint: endpoint, + Topics: topics, + } +} |