feat: multiple listeners on channels
Multiple device notifications supported. Actually beautiful
This commit is contained in:
@ -9,6 +9,7 @@ import (
|
||||
"os"
|
||||
"screenmark/screenmark/agents"
|
||||
"screenmark/screenmark/models"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/charmbracelet/log"
|
||||
@ -132,6 +133,10 @@ func ListenProcessingImageStatus(db *sql.DB, images models.ImageModel, notifier
|
||||
}
|
||||
|
||||
func CreateEventsHandler(notifier *Notifier[Notification]) http.HandlerFunc {
|
||||
counter := 0
|
||||
|
||||
userSplitters := make(map[string]*ChannelSplitter[Notification])
|
||||
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
userId := r.Context().Value(USER_ID).(uuid.UUID)
|
||||
if userId == uuid.Nil {
|
||||
@ -144,6 +149,27 @@ func CreateEventsHandler(notifier *Notifier[Notification]) http.HandlerFunc {
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
// w.(http.Flusher).Flush()
|
||||
|
||||
if _, exists := notifier.Listeners[userId.String()]; !exists {
|
||||
notifier.Create(userId.String())
|
||||
}
|
||||
|
||||
userNotifications := notifier.Listeners[userId.String()]
|
||||
|
||||
if _, exists := userSplitters[userId.String()]; !exists {
|
||||
splitter := NewChannelSplitter(userNotifications)
|
||||
|
||||
userSplitters[userId.String()] = &splitter
|
||||
splitter.Listen()
|
||||
}
|
||||
|
||||
splitter := userSplitters[userId.String()]
|
||||
|
||||
id := strconv.Itoa(counter)
|
||||
counter += 1
|
||||
|
||||
notifications := splitter.Add(id)
|
||||
defer splitter.Remove(id)
|
||||
|
||||
// if err := notifier.Create(userId.String()); err != nil {
|
||||
// // TODO: this could be better.
|
||||
// // EG: The user could attempt to create many connections
|
||||
@ -154,15 +180,13 @@ func CreateEventsHandler(notifier *Notifier[Notification]) http.HandlerFunc {
|
||||
// return
|
||||
// }
|
||||
|
||||
listener := notifier.Listeners[userId.String()]
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
fmt.Fprint(w, "event: close\ndata: Connection closed\n\n")
|
||||
w.(http.Flusher).Flush()
|
||||
return
|
||||
case msg := <-listener:
|
||||
case msg := <-notifications:
|
||||
|
||||
msgString, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
|
@ -56,3 +56,42 @@ func NewNotifier[TNotification any](bufferSize int) Notifier[TNotification] {
|
||||
Listeners: make(map[string]chan TNotification),
|
||||
}
|
||||
}
|
||||
|
||||
// ----------------------------------
|
||||
|
||||
type ChannelSplitter[TNotification any] struct {
|
||||
ch chan TNotification
|
||||
|
||||
Listeners map[string]chan TNotification
|
||||
}
|
||||
|
||||
func (s *ChannelSplitter[TNotification]) Listen() {
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case msg := <-s.ch:
|
||||
for _, v := range s.Listeners {
|
||||
v <- msg
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *ChannelSplitter[TNotification]) Add(id string) chan TNotification {
|
||||
ch := make(chan TNotification)
|
||||
s.Listeners[id] = ch
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
func (s *ChannelSplitter[TNotification]) Remove(id string) {
|
||||
delete(s.Listeners, id)
|
||||
}
|
||||
|
||||
func NewChannelSplitter[TNotification any](ch chan TNotification) ChannelSplitter[TNotification] {
|
||||
return ChannelSplitter[TNotification]{
|
||||
ch: ch,
|
||||
Listeners: make(map[string]chan TNotification),
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user