From ad2a70aaf3a9d95768d2aa04b1e6dc466e54132b Mon Sep 17 00:00:00 2001 From: John Costa Date: Mon, 12 May 2025 20:51:06 +0100 Subject: [PATCH] feat: multiple listeners on channels Multiple device notifications supported. Actually beautiful --- backend/events.go | 30 +++++++++++++++++++++++++++--- backend/notifications.go | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 3 deletions(-) diff --git a/backend/events.go b/backend/events.go index 2c6bcf4..401dd71 100644 --- a/backend/events.go +++ b/backend/events.go @@ -9,6 +9,7 @@ import ( "os" "screenmark/screenmark/agents" "screenmark/screenmark/models" + "strconv" "time" "github.com/charmbracelet/log" @@ -132,6 +133,10 @@ func ListenProcessingImageStatus(db *sql.DB, images models.ImageModel, notifier } func CreateEventsHandler(notifier *Notifier[Notification]) http.HandlerFunc { + counter := 0 + + userSplitters := make(map[string]*ChannelSplitter[Notification]) + return func(w http.ResponseWriter, r *http.Request) { userId := r.Context().Value(USER_ID).(uuid.UUID) if userId == uuid.Nil { @@ -144,6 +149,27 @@ func CreateEventsHandler(notifier *Notifier[Notification]) http.HandlerFunc { w.Header().Set("Connection", "keep-alive") // w.(http.Flusher).Flush() + if _, exists := notifier.Listeners[userId.String()]; !exists { + notifier.Create(userId.String()) + } + + userNotifications := notifier.Listeners[userId.String()] + + if _, exists := userSplitters[userId.String()]; !exists { + splitter := NewChannelSplitter(userNotifications) + + userSplitters[userId.String()] = &splitter + splitter.Listen() + } + + splitter := userSplitters[userId.String()] + + id := strconv.Itoa(counter) + counter += 1 + + notifications := splitter.Add(id) + defer splitter.Remove(id) + // if err := notifier.Create(userId.String()); err != nil { // // TODO: this could be better. // // EG: The user could attempt to create many connections @@ -154,15 +180,13 @@ func CreateEventsHandler(notifier *Notifier[Notification]) http.HandlerFunc { // return // } - listener := notifier.Listeners[userId.String()] - for { select { case <-r.Context().Done(): fmt.Fprint(w, "event: close\ndata: Connection closed\n\n") w.(http.Flusher).Flush() return - case msg := <-listener: + case msg := <-notifications: msgString, err := json.Marshal(msg) if err != nil { diff --git a/backend/notifications.go b/backend/notifications.go index 466eb0f..dc3f5ca 100644 --- a/backend/notifications.go +++ b/backend/notifications.go @@ -56,3 +56,42 @@ func NewNotifier[TNotification any](bufferSize int) Notifier[TNotification] { Listeners: make(map[string]chan TNotification), } } + +// ---------------------------------- + +type ChannelSplitter[TNotification any] struct { + ch chan TNotification + + Listeners map[string]chan TNotification +} + +func (s *ChannelSplitter[TNotification]) Listen() { + go func() { + for { + select { + case msg := <-s.ch: + for _, v := range s.Listeners { + v <- msg + } + } + } + }() +} + +func (s *ChannelSplitter[TNotification]) Add(id string) chan TNotification { + ch := make(chan TNotification) + s.Listeners[id] = ch + + return ch +} + +func (s *ChannelSplitter[TNotification]) Remove(id string) { + delete(s.Listeners, id) +} + +func NewChannelSplitter[TNotification any](ch chan TNotification) ChannelSplitter[TNotification] { + return ChannelSplitter[TNotification]{ + ch: ch, + Listeners: make(map[string]chan TNotification), + } +}