From b4b600bd7cb0915694bb9d403a0171e28253736f Mon Sep 17 00:00:00 2001 From: John Costa Date: Sun, 14 Sep 2025 17:44:33 +0100 Subject: [PATCH] Revert "refactor: creating image process to handle processing of images" This reverts commit 8b6b9453a8f86d705ec2319c51f429faf687cb52. --- backend/events.go | 112 ++++++++++++++++--- backend/imageprocessor/notification.go | 65 ----------- backend/imageprocessor/processor.go | 66 ----------- backend/images/handler.go | 62 ++-------- backend/models/image.go | 22 ---- backend/{notifier => }/notifications.go | 11 +- backend/{notifier => }/notifications_test.go | 2 +- backend/router.go | 37 +++--- 8 files changed, 127 insertions(+), 250 deletions(-) delete mode 100644 backend/imageprocessor/notification.go delete mode 100644 backend/imageprocessor/processor.go rename backend/{notifier => }/notifications.go (94%) rename backend/{notifier => }/notifications_test.go (97%) diff --git a/backend/events.go b/backend/events.go index 56f6da7..c83e83a 100644 --- a/backend/events.go +++ b/backend/events.go @@ -8,11 +8,9 @@ import ( "net/http" "os" "screenmark/screenmark/agents" - "screenmark/screenmark/imageprocessor" "screenmark/screenmark/limits" "screenmark/screenmark/middleware" "screenmark/screenmark/models" - "screenmark/screenmark/notifier" "strconv" "sync" "time" @@ -22,6 +20,62 @@ import ( "github.com/lib/pq" ) +const ( + IMAGE_TYPE = "image" + LIST_TYPE = "list" +) + +type imageNotification struct { + Type string + + ImageID uuid.UUID + ImageName string + + Status string +} + +type listNotification struct { + Type string + + ListID uuid.UUID + Name string + + Status string +} + +type Notification struct { + image *imageNotification + list *listNotification +} + +func getImageNotification(image imageNotification) Notification { + return Notification{ + image: &image, + } +} + +func getListNotification(list listNotification) Notification { + return Notification{ + list: &list, + } +} + +func (n Notification) MarshalJSON() ([]byte, error) { + if n.image != nil { + return json.Marshal(n.image) + } + + if n.list != nil { + return json.Marshal(n.list) + } + + return nil, fmt.Errorf("no image or list present") +} + +func (n *Notification) UnmarshalJSON(data []byte) error { + return fmt.Errorf("unimplemented") +} + func ListenNewImageEvents(db *sql.DB) { listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { if err != nil { @@ -95,7 +149,7 @@ func ListenNewImageEvents(db *sql.DB) { } } -func getProcessingImageStatusChannel(db *sql.DB) chan string { +func ListenProcessingImageStatus(db *sql.DB, images models.ImageModel, notifier *Notifier[Notification]) { listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { if err != nil { panic(err) @@ -103,19 +157,41 @@ func getProcessingImageStatusChannel(db *sql.DB) chan string { }) defer listener.Close() + logger := createLogger("Image Status 📊", os.Stdout) + if err := listener.Listen("new_processing_image_status"); err != nil { panic(err) } - msgChan := make(chan string) + for data := range listener.Notify { + imageStringUuid := data.Extra[0:36] + status := data.Extra[36:] - go func() { - for data := range listener.Notify { - msgChan <- data.Extra + imageUuid, err := uuid.Parse(imageStringUuid) + if err != nil { + logger.Error(err) + continue } - }() - return msgChan + processingImage, err := images.GetToProcess(context.Background(), imageUuid) + if err != nil { + logger.Error("GetToProcess failed", "err", err) + continue + } + + logger.Info("Update", "id", imageStringUuid, "status", status) + + notification := getImageNotification(imageNotification{ + Type: IMAGE_TYPE, + ImageID: processingImage.ImageID, + ImageName: processingImage.Image.ImageName, + Status: status, + }) + + if err := notifier.SendAndCreate(processingImage.UserID.String(), notification); err != nil { + logger.Error(err) + } + } } func ListenNewStackEvents(db *sql.DB) { @@ -174,7 +250,7 @@ func ListenNewStackEvents(db *sql.DB) { } } -func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier *notifier.Notifier[imageprocessor.Notification]) { +func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier *Notifier[Notification]) { listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { if err != nil { panic(err) @@ -206,8 +282,8 @@ func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier * logger.Info("Update", "id", stackStringUUID, "status", status) - notification := imageprocessor.GetListNotification(imageprocessor.ListNotification{ - Type: imageprocessor.LIST_TYPE, + notification := getListNotification(listNotification{ + Type: LIST_TYPE, Name: processingStack.Title, ListID: stackUUID, Status: status, @@ -225,10 +301,10 @@ func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier * * * What is a reasonable default? Close the channel after 1 minute of inactivity? */ -func CreateEventsHandler(notifierr *notifier.Notifier[imageprocessor.Notification]) http.HandlerFunc { +func CreateEventsHandler(notifier *Notifier[Notification]) http.HandlerFunc { counter := 0 - userSplitters := make(map[string]*notifier.ChannelSplitter[imageprocessor.Notification]) + userSplitters := make(map[string]*ChannelSplitter[Notification]) return func(w http.ResponseWriter, r *http.Request) { _userId := r.Context().Value(middleware.USER_ID).(uuid.UUID) @@ -244,14 +320,14 @@ func CreateEventsHandler(notifierr *notifier.Notifier[imageprocessor.Notificatio w.Header().Set("Connection", "keep-alive") // w.(http.Flusher).Flush() - if _, exists := notifierr.Listeners[userId]; !exists { - notifierr.Create(userId) + if _, exists := notifier.Listeners[userId]; !exists { + notifier.Create(userId) } - userNotifications := notifierr.Listeners[userId] + userNotifications := notifier.Listeners[userId] if _, exists := userSplitters[userId]; !exists { - splitter := notifier.NewChannelSplitter(userNotifications) + splitter := NewChannelSplitter(userNotifications) userSplitters[userId] = &splitter splitter.Listen() diff --git a/backend/imageprocessor/notification.go b/backend/imageprocessor/notification.go deleted file mode 100644 index b779953..0000000 --- a/backend/imageprocessor/notification.go +++ /dev/null @@ -1,65 +0,0 @@ -package imageprocessor - -import ( - "encoding/json" - "fmt" - - "github.com/google/uuid" -) - -const ( - IMAGE_TYPE = "image" - LIST_TYPE = "list" -) - -type ImageNotification struct { - Type string - - ImageID uuid.UUID - ImageName string - - Status string -} - -func getImageNotification(image ImageNotification) Notification { - return Notification{ - image: &image, - } -} - -type ListNotification struct { - Type string - - ListID uuid.UUID - Name string - - Status string -} - -type Notification struct { - image *ImageNotification - list *ListNotification -} - -func GetListNotification(list ListNotification) Notification { - return Notification{ - list: &list, - } -} - -func (n Notification) MarshalJSON() ([]byte, error) { - if n.image != nil { - return json.Marshal(n.image) - } - - if n.list != nil { - return json.Marshal(n.list) - } - - return nil, fmt.Errorf("no image or list present") -} - -func (n *Notification) UnmarshalJSON(data []byte) error { - return fmt.Errorf("unimplemented") -} - diff --git a/backend/imageprocessor/processor.go b/backend/imageprocessor/processor.go deleted file mode 100644 index c397636..0000000 --- a/backend/imageprocessor/processor.go +++ /dev/null @@ -1,66 +0,0 @@ -package imageprocessor - -import ( - "context" - "fmt" - "screenmark/screenmark/models" - "screenmark/screenmark/notifier" - - "github.com/charmbracelet/log" - "github.com/google/uuid" -) - -type DbImageProcessor struct { - logger *log.Logger - images models.ImageModel - - incomingImages chan string - - notifier *notifier.Notifier[Notification] -} - -func (p *DbImageProcessor) processImage(incomingMsg string) error { - imageStringUUID := incomingMsg[0:36] - status := incomingMsg[36:] - - imageUUID, err := uuid.Parse(imageStringUUID) - if err != nil { - return fmt.Errorf("parsing: %w", err) - } - - processingImage, err := p.images.GetToProcess(context.Background(), imageUUID) - if err != nil { - return fmt.Errorf("get to processes: %w", err) - } - - p.logger.Info("Update", "id", imageStringUUID, "status", status) - - notification := getImageNotification(ImageNotification{ - Type: IMAGE_TYPE, - ImageID: processingImage.ImageID, - ImageName: processingImage.Image.ImageName, - Status: status, - }) - - return p.notifier.SendAndCreate(processingImage.UserID.String(), notification) -} - -func (p *DbImageProcessor) Work() { - for incomingMsg := range p.incomingImages { - go func() { - err := p.processImage(incomingMsg) - if err != nil { - p.logger.Error("processing image", "err", err) - } - }() - } -} - -func NewImageProcessor(logger *log.Logger, imageModel models.ImageModel, incoming chan string, notifier *notifier.Notifier[Notification]) *DbImageProcessor { - return &DbImageProcessor{ - logger: logger, - images: imageModel, - incomingImages: incoming, - notifier: notifier, - } -} diff --git a/backend/images/handler.go b/backend/images/handler.go index 0fd5992..3aaa199 100644 --- a/backend/images/handler.go +++ b/backend/images/handler.go @@ -10,7 +10,6 @@ import ( "os" "path/filepath" "screenmark/screenmark/.gen/haystack/haystack/model" - "screenmark/screenmark/imageprocessor" "screenmark/screenmark/limits" "screenmark/screenmark/middleware" "screenmark/screenmark/models" @@ -21,12 +20,10 @@ import ( ) type ImageHandler struct { - logger *log.Logger - imageModel models.ImageModel - listModel models.ListModel - userModel models.UserModel - limitsManager limits.LimitsManagerMethods - imageProcessor *imageprocessor.DbImageProcessor + logger *log.Logger + imageModel models.ImageModel + userModel models.UserModel + limitsManager limits.LimitsManagerMethods } type ImagesReturn struct { @@ -179,43 +176,6 @@ func (h *ImageHandler) deleteImage(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } -func (h *ImageHandler) reprocessImage(w http.ResponseWriter, r *http.Request) { - stringImageID := chi.URLParam(r, "image-id") - imageID, err := uuid.Parse(stringImageID) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - ctx := r.Context() - - userID, err := middleware.GetUserID(ctx, h.logger, w) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - - isAuthorized := h.imageModel.IsUserAuthorized(ctx, imageID, userID) - if !isAuthorized { - w.WriteHeader(http.StatusUnauthorized) - return - } - - imageToProcessID, err := h.imageModel.GetImageToProcessID(ctx, imageID) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - - err = h.imageModel.SetNotStarted(ctx, imageToProcessID) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusOK) -} - func (h *ImageHandler) CreateRoutes(r chi.Router) { h.logger.Info("Mounting image router") @@ -229,23 +189,19 @@ func (h *ImageHandler) CreateRoutes(r chi.Router) { r.Get("/", h.listImages) r.Post("/{name}", middleware.WithLimit(h.logger, h.limitsManager.HasReachedImageLimit, h.uploadImage)) - r.Patch("/{image-id}", h.reprocessImage) r.Delete("/{image-id}", h.deleteImage) }) } -func CreateImageHandler(db *sql.DB, limitsManager limits.LimitsManagerMethods, imageProcessor *imageprocessor.DbImageProcessor) ImageHandler { +func CreateImageHandler(db *sql.DB, limitsManager limits.LimitsManagerMethods) ImageHandler { imageModel := models.NewImageModel(db) userModel := models.NewUserModel(db) - listModel := models.NewListModel(db) logger := log.New(os.Stdout).WithPrefix("Images") return ImageHandler{ - logger: logger, - listModel: listModel, - imageModel: imageModel, - userModel: userModel, - limitsManager: limitsManager, - imageProcessor: imageProcessor, + logger: logger, + imageModel: imageModel, + userModel: userModel, + limitsManager: limitsManager, } } diff --git a/backend/models/image.go b/backend/models/image.go index 1b9af29..edf27b4 100644 --- a/backend/models/image.go +++ b/backend/models/image.go @@ -160,28 +160,6 @@ func (m ImageModel) FinishProcessing(ctx context.Context, imageId uuid.UUID) (mo return userImage, err } -func (m ImageModel) GetImageToProcessID(ctx context.Context, imageID uuid.UUID) (uuid.UUID, error) { - getImageToProcessIDStmt := UserImagesToProcess. - SELECT(UserImagesToProcess.ID). - WHERE(UserImagesToProcess.ImageID.EQ(UUID(imageID))) - - imageToProcess := model.UserImagesToProcess{} - err := getImageToProcessIDStmt.QueryContext(ctx, m.dbPool, &imageToProcess) - - return imageToProcess.ID, err -} - -func (m ImageModel) SetNotStarted(ctx context.Context, processingImageId uuid.UUID) error { - startProcessingStmt := UserImagesToProcess. - UPDATE(UserImagesToProcess.Status). - SET(model.Progress_NotStarted). - WHERE(UserImagesToProcess.ID.EQ(UUID(processingImageId))) - - _, err := startProcessingStmt.ExecContext(ctx, m.dbPool) - - return err -} - func (m ImageModel) StartProcessing(ctx context.Context, processingImageId uuid.UUID) error { startProcessingStmt := UserImagesToProcess. UPDATE(UserImagesToProcess.Status). diff --git a/backend/notifier/notifications.go b/backend/notifications.go similarity index 94% rename from backend/notifier/notifications.go rename to backend/notifications.go index 849cc22..dc3f5ca 100644 --- a/backend/notifier/notifications.go +++ b/backend/notifications.go @@ -1,4 +1,4 @@ -package notifier +package main import ( "errors" @@ -67,9 +67,12 @@ type ChannelSplitter[TNotification any] struct { func (s *ChannelSplitter[TNotification]) Listen() { go func() { - for msg := range s.ch { - for _, v := range s.Listeners { - v <- msg + for { + select { + case msg := <-s.ch: + for _, v := range s.Listeners { + v <- msg + } } } }() diff --git a/backend/notifier/notifications_test.go b/backend/notifications_test.go similarity index 97% rename from backend/notifier/notifications_test.go rename to backend/notifications_test.go index 597d7af..2e25752 100644 --- a/backend/notifier/notifications_test.go +++ b/backend/notifications_test.go @@ -1,4 +1,4 @@ -package notifier +package main import ( "testing" diff --git a/backend/router.go b/backend/router.go index b600e03..00f1dee 100644 --- a/backend/router.go +++ b/backend/router.go @@ -5,11 +5,9 @@ import ( "os" "screenmark/screenmark/agents/client" "screenmark/screenmark/auth" - "screenmark/screenmark/imageprocessor" "screenmark/screenmark/images" "screenmark/screenmark/limits" "screenmark/screenmark/models" - "screenmark/screenmark/notifier" "screenmark/screenmark/stacks" ourmiddleware "screenmark/screenmark/middleware" @@ -32,27 +30,24 @@ func setupRouter(db *sql.DB) chi.Router { limitsManager := limits.CreateLimitsManager(db) - notifier := notifier.NewNotifier[imageprocessor.Notification](10) - - // TODO: should extract these into a notification manager - // And actually make them the same code. - // The events are basically the same. - - go ListenNewImageEvents(db) - - logger := createLogger("Image Processor", os.Stdout) - processingChan := getProcessingImageStatusChannel(db) - - imageProcessor := imageprocessor.NewImageProcessor(logger, imageModel, processingChan, ¬ifier) - - go imageProcessor.Work() - - go ListenNewStackEvents(db) - go ListenProcessingStackStatus(db, stackModel, ¬ifier) - stackHandler := stacks.CreateStackHandler(db, limitsManager) authHandler := auth.CreateAuthHandler(db) - imageHandler := images.CreateImageHandler(db, limitsManager, imageProcessor) + imageHandler := images.CreateImageHandler(db, limitsManager) + + notifier := NewNotifier[Notification](10) + + // Only start event listeners if not in test environment + if os.Getenv("GO_TEST_ENVIRONMENT") != "true" { + + // TODO: should extract these into a notification manager + // And actually make them the same code. + // The events are basically the same. + + go ListenNewImageEvents(db) + go ListenProcessingImageStatus(db, imageModel, ¬ifier) + go ListenNewStackEvents(db) + go ListenProcessingStackStatus(db, stackModel, ¬ifier) + } r := chi.NewRouter()