2 Commits

Author SHA1 Message Date
115d08a245 going back to how it was 2025-09-14 17:45:30 +01:00
b4b600bd7c Revert "refactor: creating image process to handle processing of images"
This reverts commit 8b6b9453a8f86d705ec2319c51f429faf687cb52.
2025-09-14 17:44:33 +01:00
7 changed files with 127 additions and 191 deletions

View File

@ -8,11 +8,9 @@ import (
"net/http" "net/http"
"os" "os"
"screenmark/screenmark/agents" "screenmark/screenmark/agents"
"screenmark/screenmark/imageprocessor"
"screenmark/screenmark/limits" "screenmark/screenmark/limits"
"screenmark/screenmark/middleware" "screenmark/screenmark/middleware"
"screenmark/screenmark/models" "screenmark/screenmark/models"
"screenmark/screenmark/notifier"
"strconv" "strconv"
"sync" "sync"
"time" "time"
@ -22,6 +20,62 @@ import (
"github.com/lib/pq" "github.com/lib/pq"
) )
const (
IMAGE_TYPE = "image"
LIST_TYPE = "list"
)
type imageNotification struct {
Type string
ImageID uuid.UUID
ImageName string
Status string
}
type listNotification struct {
Type string
ListID uuid.UUID
Name string
Status string
}
type Notification struct {
image *imageNotification
list *listNotification
}
func getImageNotification(image imageNotification) Notification {
return Notification{
image: &image,
}
}
func getListNotification(list listNotification) Notification {
return Notification{
list: &list,
}
}
func (n Notification) MarshalJSON() ([]byte, error) {
if n.image != nil {
return json.Marshal(n.image)
}
if n.list != nil {
return json.Marshal(n.list)
}
return nil, fmt.Errorf("no image or list present")
}
func (n *Notification) UnmarshalJSON(data []byte) error {
return fmt.Errorf("unimplemented")
}
func ListenNewImageEvents(db *sql.DB) { func ListenNewImageEvents(db *sql.DB) {
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
if err != nil { if err != nil {
@ -95,7 +149,7 @@ func ListenNewImageEvents(db *sql.DB) {
} }
} }
func getProcessingImageStatusChannel(db *sql.DB) chan string { func ListenProcessingImageStatus(db *sql.DB, images models.ImageModel, notifier *Notifier[Notification]) {
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
if err != nil { if err != nil {
panic(err) panic(err)
@ -103,19 +157,41 @@ func getProcessingImageStatusChannel(db *sql.DB) chan string {
}) })
defer listener.Close() defer listener.Close()
logger := createLogger("Image Status 📊", os.Stdout)
if err := listener.Listen("new_processing_image_status"); err != nil { if err := listener.Listen("new_processing_image_status"); err != nil {
panic(err) panic(err)
} }
msgChan := make(chan string)
go func() {
for data := range listener.Notify { for data := range listener.Notify {
msgChan <- data.Extra imageStringUuid := data.Extra[0:36]
} status := data.Extra[36:]
}()
return msgChan imageUuid, err := uuid.Parse(imageStringUuid)
if err != nil {
logger.Error(err)
continue
}
processingImage, err := images.GetToProcess(context.Background(), imageUuid)
if err != nil {
logger.Error("GetToProcess failed", "err", err)
continue
}
logger.Info("Update", "id", imageStringUuid, "status", status)
notification := getImageNotification(imageNotification{
Type: IMAGE_TYPE,
ImageID: processingImage.ImageID,
ImageName: processingImage.Image.ImageName,
Status: status,
})
if err := notifier.SendAndCreate(processingImage.UserID.String(), notification); err != nil {
logger.Error(err)
}
}
} }
func ListenNewStackEvents(db *sql.DB) { func ListenNewStackEvents(db *sql.DB) {
@ -174,7 +250,7 @@ func ListenNewStackEvents(db *sql.DB) {
} }
} }
func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier *notifier.Notifier[imageprocessor.Notification]) { func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier *Notifier[Notification]) {
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
if err != nil { if err != nil {
panic(err) panic(err)
@ -206,8 +282,8 @@ func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier *
logger.Info("Update", "id", stackStringUUID, "status", status) logger.Info("Update", "id", stackStringUUID, "status", status)
notification := imageprocessor.GetListNotification(imageprocessor.ListNotification{ notification := getListNotification(listNotification{
Type: imageprocessor.LIST_TYPE, Type: LIST_TYPE,
Name: processingStack.Title, Name: processingStack.Title,
ListID: stackUUID, ListID: stackUUID,
Status: status, Status: status,
@ -225,10 +301,10 @@ func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier *
* *
* What is a reasonable default? Close the channel after 1 minute of inactivity? * What is a reasonable default? Close the channel after 1 minute of inactivity?
*/ */
func CreateEventsHandler(notifierr *notifier.Notifier[imageprocessor.Notification]) http.HandlerFunc { func CreateEventsHandler(notifier *Notifier[Notification]) http.HandlerFunc {
counter := 0 counter := 0
userSplitters := make(map[string]*notifier.ChannelSplitter[imageprocessor.Notification]) userSplitters := make(map[string]*ChannelSplitter[Notification])
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
_userId := r.Context().Value(middleware.USER_ID).(uuid.UUID) _userId := r.Context().Value(middleware.USER_ID).(uuid.UUID)
@ -244,14 +320,14 @@ func CreateEventsHandler(notifierr *notifier.Notifier[imageprocessor.Notificatio
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")
// w.(http.Flusher).Flush() // w.(http.Flusher).Flush()
if _, exists := notifierr.Listeners[userId]; !exists { if _, exists := notifier.Listeners[userId]; !exists {
notifierr.Create(userId) notifier.Create(userId)
} }
userNotifications := notifierr.Listeners[userId] userNotifications := notifier.Listeners[userId]
if _, exists := userSplitters[userId]; !exists { if _, exists := userSplitters[userId]; !exists {
splitter := notifier.NewChannelSplitter(userNotifications) splitter := NewChannelSplitter(userNotifications)
userSplitters[userId] = &splitter userSplitters[userId] = &splitter
splitter.Listen() splitter.Listen()

View File

@ -1,65 +0,0 @@
package imageprocessor
import (
"encoding/json"
"fmt"
"github.com/google/uuid"
)
const (
IMAGE_TYPE = "image"
LIST_TYPE = "list"
)
type ImageNotification struct {
Type string
ImageID uuid.UUID
ImageName string
Status string
}
func getImageNotification(image ImageNotification) Notification {
return Notification{
image: &image,
}
}
type ListNotification struct {
Type string
ListID uuid.UUID
Name string
Status string
}
type Notification struct {
image *ImageNotification
list *ListNotification
}
func GetListNotification(list ListNotification) Notification {
return Notification{
list: &list,
}
}
func (n Notification) MarshalJSON() ([]byte, error) {
if n.image != nil {
return json.Marshal(n.image)
}
if n.list != nil {
return json.Marshal(n.list)
}
return nil, fmt.Errorf("no image or list present")
}
func (n *Notification) UnmarshalJSON(data []byte) error {
return fmt.Errorf("unimplemented")
}

View File

@ -1,66 +0,0 @@
package imageprocessor
import (
"context"
"fmt"
"screenmark/screenmark/models"
"screenmark/screenmark/notifier"
"github.com/charmbracelet/log"
"github.com/google/uuid"
)
type DbImageProcessor struct {
logger *log.Logger
images models.ImageModel
incomingImages chan string
notifier *notifier.Notifier[Notification]
}
func (p *DbImageProcessor) processImage(incomingMsg string) error {
imageStringUUID := incomingMsg[0:36]
status := incomingMsg[36:]
imageUUID, err := uuid.Parse(imageStringUUID)
if err != nil {
return fmt.Errorf("parsing: %w", err)
}
processingImage, err := p.images.GetToProcess(context.Background(), imageUUID)
if err != nil {
return fmt.Errorf("get to processes: %w", err)
}
p.logger.Info("Update", "id", imageStringUUID, "status", status)
notification := getImageNotification(ImageNotification{
Type: IMAGE_TYPE,
ImageID: processingImage.ImageID,
ImageName: processingImage.Image.ImageName,
Status: status,
})
return p.notifier.SendAndCreate(processingImage.UserID.String(), notification)
}
func (p *DbImageProcessor) Work() {
for incomingMsg := range p.incomingImages {
go func() {
err := p.processImage(incomingMsg)
if err != nil {
p.logger.Error("processing image", "err", err)
}
}()
}
}
func NewImageProcessor(logger *log.Logger, imageModel models.ImageModel, incoming chan string, notifier *notifier.Notifier[Notification]) *DbImageProcessor {
return &DbImageProcessor{
logger: logger,
images: imageModel,
incomingImages: incoming,
notifier: notifier,
}
}

View File

@ -10,7 +10,6 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"screenmark/screenmark/.gen/haystack/haystack/model" "screenmark/screenmark/.gen/haystack/haystack/model"
"screenmark/screenmark/imageprocessor"
"screenmark/screenmark/limits" "screenmark/screenmark/limits"
"screenmark/screenmark/middleware" "screenmark/screenmark/middleware"
"screenmark/screenmark/models" "screenmark/screenmark/models"
@ -23,10 +22,8 @@ import (
type ImageHandler struct { type ImageHandler struct {
logger *log.Logger logger *log.Logger
imageModel models.ImageModel imageModel models.ImageModel
listModel models.ListModel
userModel models.UserModel userModel models.UserModel
limitsManager limits.LimitsManagerMethods limitsManager limits.LimitsManagerMethods
imageProcessor *imageprocessor.DbImageProcessor
} }
type ImagesReturn struct { type ImagesReturn struct {
@ -229,23 +226,19 @@ func (h *ImageHandler) CreateRoutes(r chi.Router) {
r.Get("/", h.listImages) r.Get("/", h.listImages)
r.Post("/{name}", middleware.WithLimit(h.logger, h.limitsManager.HasReachedImageLimit, h.uploadImage)) r.Post("/{name}", middleware.WithLimit(h.logger, h.limitsManager.HasReachedImageLimit, h.uploadImage))
r.Patch("/{image-id}", h.reprocessImage)
r.Delete("/{image-id}", h.deleteImage) r.Delete("/{image-id}", h.deleteImage)
}) })
} }
func CreateImageHandler(db *sql.DB, limitsManager limits.LimitsManagerMethods, imageProcessor *imageprocessor.DbImageProcessor) ImageHandler { func CreateImageHandler(db *sql.DB, limitsManager limits.LimitsManagerMethods) ImageHandler {
imageModel := models.NewImageModel(db) imageModel := models.NewImageModel(db)
userModel := models.NewUserModel(db) userModel := models.NewUserModel(db)
listModel := models.NewListModel(db)
logger := log.New(os.Stdout).WithPrefix("Images") logger := log.New(os.Stdout).WithPrefix("Images")
return ImageHandler{ return ImageHandler{
logger: logger, logger: logger,
listModel: listModel,
imageModel: imageModel, imageModel: imageModel,
userModel: userModel, userModel: userModel,
limitsManager: limitsManager, limitsManager: limitsManager,
imageProcessor: imageProcessor,
} }
} }

View File

@ -1,4 +1,4 @@
package notifier package main
import ( import (
"errors" "errors"
@ -67,11 +67,14 @@ type ChannelSplitter[TNotification any] struct {
func (s *ChannelSplitter[TNotification]) Listen() { func (s *ChannelSplitter[TNotification]) Listen() {
go func() { go func() {
for msg := range s.ch { for {
select {
case msg := <-s.ch:
for _, v := range s.Listeners { for _, v := range s.Listeners {
v <- msg v <- msg
} }
} }
}
}() }()
} }

View File

@ -1,4 +1,4 @@
package notifier package main
import ( import (
"testing" "testing"

View File

@ -5,11 +5,9 @@ import (
"os" "os"
"screenmark/screenmark/agents/client" "screenmark/screenmark/agents/client"
"screenmark/screenmark/auth" "screenmark/screenmark/auth"
"screenmark/screenmark/imageprocessor"
"screenmark/screenmark/images" "screenmark/screenmark/images"
"screenmark/screenmark/limits" "screenmark/screenmark/limits"
"screenmark/screenmark/models" "screenmark/screenmark/models"
"screenmark/screenmark/notifier"
"screenmark/screenmark/stacks" "screenmark/screenmark/stacks"
ourmiddleware "screenmark/screenmark/middleware" ourmiddleware "screenmark/screenmark/middleware"
@ -32,27 +30,24 @@ func setupRouter(db *sql.DB) chi.Router {
limitsManager := limits.CreateLimitsManager(db) limitsManager := limits.CreateLimitsManager(db)
notifier := notifier.NewNotifier[imageprocessor.Notification](10) stackHandler := stacks.CreateStackHandler(db, limitsManager)
authHandler := auth.CreateAuthHandler(db)
imageHandler := images.CreateImageHandler(db, limitsManager)
notifier := NewNotifier[Notification](10)
// Only start event listeners if not in test environment
if os.Getenv("GO_TEST_ENVIRONMENT") != "true" {
// TODO: should extract these into a notification manager // TODO: should extract these into a notification manager
// And actually make them the same code. // And actually make them the same code.
// The events are basically the same. // The events are basically the same.
go ListenNewImageEvents(db) go ListenNewImageEvents(db)
go ListenProcessingImageStatus(db, imageModel, &notifier)
logger := createLogger("Image Processor", os.Stdout)
processingChan := getProcessingImageStatusChannel(db)
imageProcessor := imageprocessor.NewImageProcessor(logger, imageModel, processingChan, &notifier)
go imageProcessor.Work()
go ListenNewStackEvents(db) go ListenNewStackEvents(db)
go ListenProcessingStackStatus(db, stackModel, &notifier) go ListenProcessingStackStatus(db, stackModel, &notifier)
}
stackHandler := stacks.CreateStackHandler(db, limitsManager)
authHandler := auth.CreateAuthHandler(db)
imageHandler := images.CreateImageHandler(db, limitsManager, imageProcessor)
r := chi.NewRouter() r := chi.NewRouter()