Compare commits
2 Commits
ae3fa08199
...
ce2cd977ac
Author | SHA1 | Date | |
---|---|---|---|
ce2cd977ac | |||
8b6b9453a8 |
@ -12,6 +12,7 @@ import (
|
|||||||
"screenmark/screenmark/limits"
|
"screenmark/screenmark/limits"
|
||||||
"screenmark/screenmark/middleware"
|
"screenmark/screenmark/middleware"
|
||||||
"screenmark/screenmark/models"
|
"screenmark/screenmark/models"
|
||||||
|
"screenmark/screenmark/notifier"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -173,7 +174,7 @@ func ListenNewStackEvents(db *sql.DB) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier *Notifier[imageprocessor.Notification]) {
|
func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier *notifier.Notifier[imageprocessor.Notification]) {
|
||||||
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
|
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
@ -224,10 +225,10 @@ func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier *
|
|||||||
*
|
*
|
||||||
* What is a reasonable default? Close the channel after 1 minute of inactivity?
|
* What is a reasonable default? Close the channel after 1 minute of inactivity?
|
||||||
*/
|
*/
|
||||||
func CreateEventsHandler(notifier *Notifier[imageprocessor.Notification]) http.HandlerFunc {
|
func CreateEventsHandler(notifierr *notifier.Notifier[imageprocessor.Notification]) http.HandlerFunc {
|
||||||
counter := 0
|
counter := 0
|
||||||
|
|
||||||
userSplitters := make(map[string]*ChannelSplitter[imageprocessor.Notification])
|
userSplitters := make(map[string]*notifier.ChannelSplitter[imageprocessor.Notification])
|
||||||
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
_userId := r.Context().Value(middleware.USER_ID).(uuid.UUID)
|
_userId := r.Context().Value(middleware.USER_ID).(uuid.UUID)
|
||||||
@ -243,14 +244,14 @@ func CreateEventsHandler(notifier *Notifier[imageprocessor.Notification]) http.H
|
|||||||
w.Header().Set("Connection", "keep-alive")
|
w.Header().Set("Connection", "keep-alive")
|
||||||
// w.(http.Flusher).Flush()
|
// w.(http.Flusher).Flush()
|
||||||
|
|
||||||
if _, exists := notifier.Listeners[userId]; !exists {
|
if _, exists := notifierr.Listeners[userId]; !exists {
|
||||||
notifier.Create(userId)
|
notifierr.Create(userId)
|
||||||
}
|
}
|
||||||
|
|
||||||
userNotifications := notifier.Listeners[userId]
|
userNotifications := notifierr.Listeners[userId]
|
||||||
|
|
||||||
if _, exists := userSplitters[userId]; !exists {
|
if _, exists := userSplitters[userId]; !exists {
|
||||||
splitter := NewChannelSplitter(userNotifications)
|
splitter := notifier.NewChannelSplitter(userNotifications)
|
||||||
|
|
||||||
userSplitters[userId] = &splitter
|
userSplitters[userId] = &splitter
|
||||||
splitter.Listen()
|
splitter.Listen()
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"screenmark/screenmark/models"
|
"screenmark/screenmark/models"
|
||||||
|
"screenmark/screenmark/notifier"
|
||||||
|
|
||||||
"github.com/charmbracelet/log"
|
"github.com/charmbracelet/log"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
@ -15,7 +16,7 @@ type DbImageProcessor struct {
|
|||||||
|
|
||||||
incomingImages chan string
|
incomingImages chan string
|
||||||
|
|
||||||
outgoing func(userID string, n Notification)
|
notifier *notifier.Notifier[Notification]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *DbImageProcessor) processImage(incomingMsg string) error {
|
func (p *DbImageProcessor) processImage(incomingMsg string) error {
|
||||||
@ -41,9 +42,7 @@ func (p *DbImageProcessor) processImage(incomingMsg string) error {
|
|||||||
Status: status,
|
Status: status,
|
||||||
})
|
})
|
||||||
|
|
||||||
p.outgoing(processingImage.UserID.String(), notification)
|
return p.notifier.SendAndCreate(processingImage.UserID.String(), notification)
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *DbImageProcessor) Work() {
|
func (p *DbImageProcessor) Work() {
|
||||||
@ -57,11 +56,11 @@ func (p *DbImageProcessor) Work() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewImageProcessor(logger *log.Logger, imageModel models.ImageModel, incoming chan string, outgoing func(userID string, n Notification)) *DbImageProcessor {
|
func NewImageProcessor(logger *log.Logger, imageModel models.ImageModel, incoming chan string, notifier *notifier.Notifier[Notification]) *DbImageProcessor {
|
||||||
return &DbImageProcessor{
|
return &DbImageProcessor{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
images: imageModel,
|
images: imageModel,
|
||||||
incomingImages: incoming,
|
incomingImages: incoming,
|
||||||
outgoing: outgoing,
|
notifier: notifier,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package main
|
package notifier
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
@ -67,14 +67,11 @@ type ChannelSplitter[TNotification any] struct {
|
|||||||
|
|
||||||
func (s *ChannelSplitter[TNotification]) Listen() {
|
func (s *ChannelSplitter[TNotification]) Listen() {
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for msg := range s.ch {
|
||||||
select {
|
|
||||||
case msg := <-s.ch:
|
|
||||||
for _, v := range s.Listeners {
|
for _, v := range s.Listeners {
|
||||||
v <- msg
|
v <- msg
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
@ -1,4 +1,4 @@
|
|||||||
package main
|
package notifier
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
@ -9,6 +9,7 @@ import (
|
|||||||
"screenmark/screenmark/images"
|
"screenmark/screenmark/images"
|
||||||
"screenmark/screenmark/limits"
|
"screenmark/screenmark/limits"
|
||||||
"screenmark/screenmark/models"
|
"screenmark/screenmark/models"
|
||||||
|
"screenmark/screenmark/notifier"
|
||||||
"screenmark/screenmark/stacks"
|
"screenmark/screenmark/stacks"
|
||||||
|
|
||||||
ourmiddleware "screenmark/screenmark/middleware"
|
ourmiddleware "screenmark/screenmark/middleware"
|
||||||
@ -31,7 +32,7 @@ func setupRouter(db *sql.DB) chi.Router {
|
|||||||
|
|
||||||
limitsManager := limits.CreateLimitsManager(db)
|
limitsManager := limits.CreateLimitsManager(db)
|
||||||
|
|
||||||
notifier := NewNotifier[imageprocessor.Notification](10)
|
notifier := notifier.NewNotifier[imageprocessor.Notification](10)
|
||||||
|
|
||||||
// TODO: should extract these into a notification manager
|
// TODO: should extract these into a notification manager
|
||||||
// And actually make them the same code.
|
// And actually make them the same code.
|
||||||
@ -42,11 +43,7 @@ func setupRouter(db *sql.DB) chi.Router {
|
|||||||
logger := createLogger("Image Processor", os.Stdout)
|
logger := createLogger("Image Processor", os.Stdout)
|
||||||
processingChan := getProcessingImageStatusChannel(db)
|
processingChan := getProcessingImageStatusChannel(db)
|
||||||
|
|
||||||
imageProcessor := imageprocessor.NewImageProcessor(logger, imageModel, processingChan, func(userID string, n imageprocessor.Notification) {
|
imageProcessor := imageprocessor.NewImageProcessor(logger, imageModel, processingChan, ¬ifier)
|
||||||
if err := notifier.SendAndCreate(userID, n); err != nil {
|
|
||||||
logger.Error("sending notification", "err", err)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
go imageProcessor.Work()
|
go imageProcessor.Work()
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user