From 8b6b9453a8f86d705ec2319c51f429faf687cb52 Mon Sep 17 00:00:00 2001 From: John Costa Date: Sun, 14 Sep 2025 17:17:54 +0100 Subject: [PATCH] refactor: creating image process to handle processing of images Decoupling this from the DB, it's a good step. Not yet perfect however. --- backend/events.go | 112 +++---------------- backend/imageprocessor/notification.go | 65 +++++++++++ backend/imageprocessor/processor.go | 66 +++++++++++ backend/images/handler.go | 62 ++++++++-- backend/models/image.go | 22 ++++ backend/{ => notifier}/notifications.go | 11 +- backend/{ => notifier}/notifications_test.go | 2 +- backend/router.go | 37 +++--- 8 files changed, 250 insertions(+), 127 deletions(-) create mode 100644 backend/imageprocessor/notification.go create mode 100644 backend/imageprocessor/processor.go rename backend/{ => notifier}/notifications.go (94%) rename backend/{ => notifier}/notifications_test.go (97%) diff --git a/backend/events.go b/backend/events.go index c83e83a..56f6da7 100644 --- a/backend/events.go +++ b/backend/events.go @@ -8,9 +8,11 @@ import ( "net/http" "os" "screenmark/screenmark/agents" + "screenmark/screenmark/imageprocessor" "screenmark/screenmark/limits" "screenmark/screenmark/middleware" "screenmark/screenmark/models" + "screenmark/screenmark/notifier" "strconv" "sync" "time" @@ -20,62 +22,6 @@ import ( "github.com/lib/pq" ) -const ( - IMAGE_TYPE = "image" - LIST_TYPE = "list" -) - -type imageNotification struct { - Type string - - ImageID uuid.UUID - ImageName string - - Status string -} - -type listNotification struct { - Type string - - ListID uuid.UUID - Name string - - Status string -} - -type Notification struct { - image *imageNotification - list *listNotification -} - -func getImageNotification(image imageNotification) Notification { - return Notification{ - image: &image, - } -} - -func getListNotification(list listNotification) Notification { - return Notification{ - list: &list, - } -} - -func (n Notification) MarshalJSON() ([]byte, error) { - if n.image != nil { - return json.Marshal(n.image) - } - - if n.list != nil { - return json.Marshal(n.list) - } - - return nil, fmt.Errorf("no image or list present") -} - -func (n *Notification) UnmarshalJSON(data []byte) error { - return fmt.Errorf("unimplemented") -} - func ListenNewImageEvents(db *sql.DB) { listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { if err != nil { @@ -149,7 +95,7 @@ func ListenNewImageEvents(db *sql.DB) { } } -func ListenProcessingImageStatus(db *sql.DB, images models.ImageModel, notifier *Notifier[Notification]) { +func getProcessingImageStatusChannel(db *sql.DB) chan string { listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { if err != nil { panic(err) @@ -157,41 +103,19 @@ func ListenProcessingImageStatus(db *sql.DB, images models.ImageModel, notifier }) defer listener.Close() - logger := createLogger("Image Status 📊", os.Stdout) - if err := listener.Listen("new_processing_image_status"); err != nil { panic(err) } - for data := range listener.Notify { - imageStringUuid := data.Extra[0:36] - status := data.Extra[36:] + msgChan := make(chan string) - imageUuid, err := uuid.Parse(imageStringUuid) - if err != nil { - logger.Error(err) - continue + go func() { + for data := range listener.Notify { + msgChan <- data.Extra } + }() - processingImage, err := images.GetToProcess(context.Background(), imageUuid) - if err != nil { - logger.Error("GetToProcess failed", "err", err) - continue - } - - logger.Info("Update", "id", imageStringUuid, "status", status) - - notification := getImageNotification(imageNotification{ - Type: IMAGE_TYPE, - ImageID: processingImage.ImageID, - ImageName: processingImage.Image.ImageName, - Status: status, - }) - - if err := notifier.SendAndCreate(processingImage.UserID.String(), notification); err != nil { - logger.Error(err) - } - } + return msgChan } func ListenNewStackEvents(db *sql.DB) { @@ -250,7 +174,7 @@ func ListenNewStackEvents(db *sql.DB) { } } -func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier *Notifier[Notification]) { +func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier *notifier.Notifier[imageprocessor.Notification]) { listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { if err != nil { panic(err) @@ -282,8 +206,8 @@ func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier * logger.Info("Update", "id", stackStringUUID, "status", status) - notification := getListNotification(listNotification{ - Type: LIST_TYPE, + notification := imageprocessor.GetListNotification(imageprocessor.ListNotification{ + Type: imageprocessor.LIST_TYPE, Name: processingStack.Title, ListID: stackUUID, Status: status, @@ -301,10 +225,10 @@ func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier * * * What is a reasonable default? Close the channel after 1 minute of inactivity? */ -func CreateEventsHandler(notifier *Notifier[Notification]) http.HandlerFunc { +func CreateEventsHandler(notifierr *notifier.Notifier[imageprocessor.Notification]) http.HandlerFunc { counter := 0 - userSplitters := make(map[string]*ChannelSplitter[Notification]) + userSplitters := make(map[string]*notifier.ChannelSplitter[imageprocessor.Notification]) return func(w http.ResponseWriter, r *http.Request) { _userId := r.Context().Value(middleware.USER_ID).(uuid.UUID) @@ -320,14 +244,14 @@ func CreateEventsHandler(notifier *Notifier[Notification]) http.HandlerFunc { w.Header().Set("Connection", "keep-alive") // w.(http.Flusher).Flush() - if _, exists := notifier.Listeners[userId]; !exists { - notifier.Create(userId) + if _, exists := notifierr.Listeners[userId]; !exists { + notifierr.Create(userId) } - userNotifications := notifier.Listeners[userId] + userNotifications := notifierr.Listeners[userId] if _, exists := userSplitters[userId]; !exists { - splitter := NewChannelSplitter(userNotifications) + splitter := notifier.NewChannelSplitter(userNotifications) userSplitters[userId] = &splitter splitter.Listen() diff --git a/backend/imageprocessor/notification.go b/backend/imageprocessor/notification.go new file mode 100644 index 0000000..b779953 --- /dev/null +++ b/backend/imageprocessor/notification.go @@ -0,0 +1,65 @@ +package imageprocessor + +import ( + "encoding/json" + "fmt" + + "github.com/google/uuid" +) + +const ( + IMAGE_TYPE = "image" + LIST_TYPE = "list" +) + +type ImageNotification struct { + Type string + + ImageID uuid.UUID + ImageName string + + Status string +} + +func getImageNotification(image ImageNotification) Notification { + return Notification{ + image: &image, + } +} + +type ListNotification struct { + Type string + + ListID uuid.UUID + Name string + + Status string +} + +type Notification struct { + image *ImageNotification + list *ListNotification +} + +func GetListNotification(list ListNotification) Notification { + return Notification{ + list: &list, + } +} + +func (n Notification) MarshalJSON() ([]byte, error) { + if n.image != nil { + return json.Marshal(n.image) + } + + if n.list != nil { + return json.Marshal(n.list) + } + + return nil, fmt.Errorf("no image or list present") +} + +func (n *Notification) UnmarshalJSON(data []byte) error { + return fmt.Errorf("unimplemented") +} + diff --git a/backend/imageprocessor/processor.go b/backend/imageprocessor/processor.go new file mode 100644 index 0000000..c397636 --- /dev/null +++ b/backend/imageprocessor/processor.go @@ -0,0 +1,66 @@ +package imageprocessor + +import ( + "context" + "fmt" + "screenmark/screenmark/models" + "screenmark/screenmark/notifier" + + "github.com/charmbracelet/log" + "github.com/google/uuid" +) + +type DbImageProcessor struct { + logger *log.Logger + images models.ImageModel + + incomingImages chan string + + notifier *notifier.Notifier[Notification] +} + +func (p *DbImageProcessor) processImage(incomingMsg string) error { + imageStringUUID := incomingMsg[0:36] + status := incomingMsg[36:] + + imageUUID, err := uuid.Parse(imageStringUUID) + if err != nil { + return fmt.Errorf("parsing: %w", err) + } + + processingImage, err := p.images.GetToProcess(context.Background(), imageUUID) + if err != nil { + return fmt.Errorf("get to processes: %w", err) + } + + p.logger.Info("Update", "id", imageStringUUID, "status", status) + + notification := getImageNotification(ImageNotification{ + Type: IMAGE_TYPE, + ImageID: processingImage.ImageID, + ImageName: processingImage.Image.ImageName, + Status: status, + }) + + return p.notifier.SendAndCreate(processingImage.UserID.String(), notification) +} + +func (p *DbImageProcessor) Work() { + for incomingMsg := range p.incomingImages { + go func() { + err := p.processImage(incomingMsg) + if err != nil { + p.logger.Error("processing image", "err", err) + } + }() + } +} + +func NewImageProcessor(logger *log.Logger, imageModel models.ImageModel, incoming chan string, notifier *notifier.Notifier[Notification]) *DbImageProcessor { + return &DbImageProcessor{ + logger: logger, + images: imageModel, + incomingImages: incoming, + notifier: notifier, + } +} diff --git a/backend/images/handler.go b/backend/images/handler.go index 3aaa199..0fd5992 100644 --- a/backend/images/handler.go +++ b/backend/images/handler.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "screenmark/screenmark/.gen/haystack/haystack/model" + "screenmark/screenmark/imageprocessor" "screenmark/screenmark/limits" "screenmark/screenmark/middleware" "screenmark/screenmark/models" @@ -20,10 +21,12 @@ import ( ) type ImageHandler struct { - logger *log.Logger - imageModel models.ImageModel - userModel models.UserModel - limitsManager limits.LimitsManagerMethods + logger *log.Logger + imageModel models.ImageModel + listModel models.ListModel + userModel models.UserModel + limitsManager limits.LimitsManagerMethods + imageProcessor *imageprocessor.DbImageProcessor } type ImagesReturn struct { @@ -176,6 +179,43 @@ func (h *ImageHandler) deleteImage(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } +func (h *ImageHandler) reprocessImage(w http.ResponseWriter, r *http.Request) { + stringImageID := chi.URLParam(r, "image-id") + imageID, err := uuid.Parse(stringImageID) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + ctx := r.Context() + + userID, err := middleware.GetUserID(ctx, h.logger, w) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + isAuthorized := h.imageModel.IsUserAuthorized(ctx, imageID, userID) + if !isAuthorized { + w.WriteHeader(http.StatusUnauthorized) + return + } + + imageToProcessID, err := h.imageModel.GetImageToProcessID(ctx, imageID) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + err = h.imageModel.SetNotStarted(ctx, imageToProcessID) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} + func (h *ImageHandler) CreateRoutes(r chi.Router) { h.logger.Info("Mounting image router") @@ -189,19 +229,23 @@ func (h *ImageHandler) CreateRoutes(r chi.Router) { r.Get("/", h.listImages) r.Post("/{name}", middleware.WithLimit(h.logger, h.limitsManager.HasReachedImageLimit, h.uploadImage)) + r.Patch("/{image-id}", h.reprocessImage) r.Delete("/{image-id}", h.deleteImage) }) } -func CreateImageHandler(db *sql.DB, limitsManager limits.LimitsManagerMethods) ImageHandler { +func CreateImageHandler(db *sql.DB, limitsManager limits.LimitsManagerMethods, imageProcessor *imageprocessor.DbImageProcessor) ImageHandler { imageModel := models.NewImageModel(db) userModel := models.NewUserModel(db) + listModel := models.NewListModel(db) logger := log.New(os.Stdout).WithPrefix("Images") return ImageHandler{ - logger: logger, - imageModel: imageModel, - userModel: userModel, - limitsManager: limitsManager, + logger: logger, + listModel: listModel, + imageModel: imageModel, + userModel: userModel, + limitsManager: limitsManager, + imageProcessor: imageProcessor, } } diff --git a/backend/models/image.go b/backend/models/image.go index edf27b4..1b9af29 100644 --- a/backend/models/image.go +++ b/backend/models/image.go @@ -160,6 +160,28 @@ func (m ImageModel) FinishProcessing(ctx context.Context, imageId uuid.UUID) (mo return userImage, err } +func (m ImageModel) GetImageToProcessID(ctx context.Context, imageID uuid.UUID) (uuid.UUID, error) { + getImageToProcessIDStmt := UserImagesToProcess. + SELECT(UserImagesToProcess.ID). + WHERE(UserImagesToProcess.ImageID.EQ(UUID(imageID))) + + imageToProcess := model.UserImagesToProcess{} + err := getImageToProcessIDStmt.QueryContext(ctx, m.dbPool, &imageToProcess) + + return imageToProcess.ID, err +} + +func (m ImageModel) SetNotStarted(ctx context.Context, processingImageId uuid.UUID) error { + startProcessingStmt := UserImagesToProcess. + UPDATE(UserImagesToProcess.Status). + SET(model.Progress_NotStarted). + WHERE(UserImagesToProcess.ID.EQ(UUID(processingImageId))) + + _, err := startProcessingStmt.ExecContext(ctx, m.dbPool) + + return err +} + func (m ImageModel) StartProcessing(ctx context.Context, processingImageId uuid.UUID) error { startProcessingStmt := UserImagesToProcess. UPDATE(UserImagesToProcess.Status). diff --git a/backend/notifications.go b/backend/notifier/notifications.go similarity index 94% rename from backend/notifications.go rename to backend/notifier/notifications.go index dc3f5ca..849cc22 100644 --- a/backend/notifications.go +++ b/backend/notifier/notifications.go @@ -1,4 +1,4 @@ -package main +package notifier import ( "errors" @@ -67,12 +67,9 @@ type ChannelSplitter[TNotification any] struct { func (s *ChannelSplitter[TNotification]) Listen() { go func() { - for { - select { - case msg := <-s.ch: - for _, v := range s.Listeners { - v <- msg - } + for msg := range s.ch { + for _, v := range s.Listeners { + v <- msg } } }() diff --git a/backend/notifications_test.go b/backend/notifier/notifications_test.go similarity index 97% rename from backend/notifications_test.go rename to backend/notifier/notifications_test.go index 2e25752..597d7af 100644 --- a/backend/notifications_test.go +++ b/backend/notifier/notifications_test.go @@ -1,4 +1,4 @@ -package main +package notifier import ( "testing" diff --git a/backend/router.go b/backend/router.go index 00f1dee..b600e03 100644 --- a/backend/router.go +++ b/backend/router.go @@ -5,9 +5,11 @@ import ( "os" "screenmark/screenmark/agents/client" "screenmark/screenmark/auth" + "screenmark/screenmark/imageprocessor" "screenmark/screenmark/images" "screenmark/screenmark/limits" "screenmark/screenmark/models" + "screenmark/screenmark/notifier" "screenmark/screenmark/stacks" ourmiddleware "screenmark/screenmark/middleware" @@ -30,24 +32,27 @@ func setupRouter(db *sql.DB) chi.Router { limitsManager := limits.CreateLimitsManager(db) + notifier := notifier.NewNotifier[imageprocessor.Notification](10) + + // TODO: should extract these into a notification manager + // And actually make them the same code. + // The events are basically the same. + + go ListenNewImageEvents(db) + + logger := createLogger("Image Processor", os.Stdout) + processingChan := getProcessingImageStatusChannel(db) + + imageProcessor := imageprocessor.NewImageProcessor(logger, imageModel, processingChan, ¬ifier) + + go imageProcessor.Work() + + go ListenNewStackEvents(db) + go ListenProcessingStackStatus(db, stackModel, ¬ifier) + stackHandler := stacks.CreateStackHandler(db, limitsManager) authHandler := auth.CreateAuthHandler(db) - imageHandler := images.CreateImageHandler(db, limitsManager) - - notifier := NewNotifier[Notification](10) - - // Only start event listeners if not in test environment - if os.Getenv("GO_TEST_ENVIRONMENT") != "true" { - - // TODO: should extract these into a notification manager - // And actually make them the same code. - // The events are basically the same. - - go ListenNewImageEvents(db) - go ListenProcessingImageStatus(db, imageModel, ¬ifier) - go ListenNewStackEvents(db) - go ListenProcessingStackStatus(db, stackModel, ¬ifier) - } + imageHandler := images.CreateImageHandler(db, limitsManager, imageProcessor) r := chi.NewRouter()