2 Commits

Author SHA1 Message Date
ce2cd977ac frontend method to refresh image 2025-09-14 17:42:16 +01:00
8b6b9453a8 refactor: creating image process to handle processing of images
Decoupling this from the DB, it's a good step.

Not yet perfect however.
2025-09-14 17:42:16 +01:00
5 changed files with 21 additions and 27 deletions

View File

@ -12,6 +12,7 @@ import (
"screenmark/screenmark/limits" "screenmark/screenmark/limits"
"screenmark/screenmark/middleware" "screenmark/screenmark/middleware"
"screenmark/screenmark/models" "screenmark/screenmark/models"
"screenmark/screenmark/notifier"
"strconv" "strconv"
"sync" "sync"
"time" "time"
@ -173,7 +174,7 @@ func ListenNewStackEvents(db *sql.DB) {
} }
} }
func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier *Notifier[imageprocessor.Notification]) { func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier *notifier.Notifier[imageprocessor.Notification]) {
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
if err != nil { if err != nil {
panic(err) panic(err)
@ -224,10 +225,10 @@ func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier *
* *
* What is a reasonable default? Close the channel after 1 minute of inactivity? * What is a reasonable default? Close the channel after 1 minute of inactivity?
*/ */
func CreateEventsHandler(notifier *Notifier[imageprocessor.Notification]) http.HandlerFunc { func CreateEventsHandler(notifierr *notifier.Notifier[imageprocessor.Notification]) http.HandlerFunc {
counter := 0 counter := 0
userSplitters := make(map[string]*ChannelSplitter[imageprocessor.Notification]) userSplitters := make(map[string]*notifier.ChannelSplitter[imageprocessor.Notification])
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
_userId := r.Context().Value(middleware.USER_ID).(uuid.UUID) _userId := r.Context().Value(middleware.USER_ID).(uuid.UUID)
@ -243,14 +244,14 @@ func CreateEventsHandler(notifier *Notifier[imageprocessor.Notification]) http.H
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")
// w.(http.Flusher).Flush() // w.(http.Flusher).Flush()
if _, exists := notifier.Listeners[userId]; !exists { if _, exists := notifierr.Listeners[userId]; !exists {
notifier.Create(userId) notifierr.Create(userId)
} }
userNotifications := notifier.Listeners[userId] userNotifications := notifierr.Listeners[userId]
if _, exists := userSplitters[userId]; !exists { if _, exists := userSplitters[userId]; !exists {
splitter := NewChannelSplitter(userNotifications) splitter := notifier.NewChannelSplitter(userNotifications)
userSplitters[userId] = &splitter userSplitters[userId] = &splitter
splitter.Listen() splitter.Listen()

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"screenmark/screenmark/models" "screenmark/screenmark/models"
"screenmark/screenmark/notifier"
"github.com/charmbracelet/log" "github.com/charmbracelet/log"
"github.com/google/uuid" "github.com/google/uuid"
@ -15,7 +16,7 @@ type DbImageProcessor struct {
incomingImages chan string incomingImages chan string
outgoing func(userID string, n Notification) notifier *notifier.Notifier[Notification]
} }
func (p *DbImageProcessor) processImage(incomingMsg string) error { func (p *DbImageProcessor) processImage(incomingMsg string) error {
@ -41,9 +42,7 @@ func (p *DbImageProcessor) processImage(incomingMsg string) error {
Status: status, Status: status,
}) })
p.outgoing(processingImage.UserID.String(), notification) return p.notifier.SendAndCreate(processingImage.UserID.String(), notification)
return nil
} }
func (p *DbImageProcessor) Work() { func (p *DbImageProcessor) Work() {
@ -57,11 +56,11 @@ func (p *DbImageProcessor) Work() {
} }
} }
func NewImageProcessor(logger *log.Logger, imageModel models.ImageModel, incoming chan string, outgoing func(userID string, n Notification)) *DbImageProcessor { func NewImageProcessor(logger *log.Logger, imageModel models.ImageModel, incoming chan string, notifier *notifier.Notifier[Notification]) *DbImageProcessor {
return &DbImageProcessor{ return &DbImageProcessor{
logger: logger, logger: logger,
images: imageModel, images: imageModel,
incomingImages: incoming, incomingImages: incoming,
outgoing: outgoing, notifier: notifier,
} }
} }

View File

@ -1,4 +1,4 @@
package main package notifier
import ( import (
"errors" "errors"
@ -67,12 +67,9 @@ type ChannelSplitter[TNotification any] struct {
func (s *ChannelSplitter[TNotification]) Listen() { func (s *ChannelSplitter[TNotification]) Listen() {
go func() { go func() {
for { for msg := range s.ch {
select { for _, v := range s.Listeners {
case msg := <-s.ch: v <- msg
for _, v := range s.Listeners {
v <- msg
}
} }
} }
}() }()

View File

@ -1,4 +1,4 @@
package main package notifier
import ( import (
"testing" "testing"

View File

@ -9,6 +9,7 @@ import (
"screenmark/screenmark/images" "screenmark/screenmark/images"
"screenmark/screenmark/limits" "screenmark/screenmark/limits"
"screenmark/screenmark/models" "screenmark/screenmark/models"
"screenmark/screenmark/notifier"
"screenmark/screenmark/stacks" "screenmark/screenmark/stacks"
ourmiddleware "screenmark/screenmark/middleware" ourmiddleware "screenmark/screenmark/middleware"
@ -31,7 +32,7 @@ func setupRouter(db *sql.DB) chi.Router {
limitsManager := limits.CreateLimitsManager(db) limitsManager := limits.CreateLimitsManager(db)
notifier := NewNotifier[imageprocessor.Notification](10) notifier := notifier.NewNotifier[imageprocessor.Notification](10)
// TODO: should extract these into a notification manager // TODO: should extract these into a notification manager
// And actually make them the same code. // And actually make them the same code.
@ -42,11 +43,7 @@ func setupRouter(db *sql.DB) chi.Router {
logger := createLogger("Image Processor", os.Stdout) logger := createLogger("Image Processor", os.Stdout)
processingChan := getProcessingImageStatusChannel(db) processingChan := getProcessingImageStatusChannel(db)
imageProcessor := imageprocessor.NewImageProcessor(logger, imageModel, processingChan, func(userID string, n imageprocessor.Notification) { imageProcessor := imageprocessor.NewImageProcessor(logger, imageModel, processingChan, &notifier)
if err := notifier.SendAndCreate(userID, n); err != nil {
logger.Error("sending notification", "err", err)
}
})
go imageProcessor.Work() go imageProcessor.Work()