From ec7bd469f9e642e9002981f172384ffaa091e0d1 Mon Sep 17 00:00:00 2001 From: John Costa Date: Mon, 25 Aug 2025 15:13:29 +0100 Subject: [PATCH] sending notifications about new stacks --- backend/events.go | 109 +++++++++++++++++++++++++++++++++++++--- backend/main.go | 9 +++- backend/models/lists.go | 15 ++++++ backend/schema.sql | 14 ++++++ 4 files changed, 140 insertions(+), 7 deletions(-) diff --git a/backend/events.go b/backend/events.go index 27593fe..45e8775 100644 --- a/backend/events.go +++ b/backend/events.go @@ -19,13 +19,63 @@ import ( "github.com/lib/pq" ) -type Notification struct { +const ( + IMAGE_TYPE = "image" + LIST_TYPE = "list" +) + +type imageNotification struct { + Type string + ImageID uuid.UUID ImageName string - Status string + + Status string } -func ListenNewImageEvents(db *sql.DB, notifier *Notifier[Notification]) { +type listNotification struct { + Type string + + ListID uuid.UUID + Name string + + Status string +} + +type Notification struct { + image *imageNotification + list *listNotification +} + +func getImageNotification(image imageNotification) Notification { + return Notification{ + image: &image, + } +} + +func getListNotification(list listNotification) Notification { + return Notification{ + list: &list, + } +} + +func (n Notification) MarshalJSON() ([]byte, error) { + if n.image != nil { + return json.Marshal(n.image) + } + + if n.list != nil { + return json.Marshal(n.list) + } + + return nil, fmt.Errorf("no image or list present") +} + +func (n *Notification) UnmarshalJSON(data []byte) error { + return fmt.Errorf("unimplemented") +} + +func ListenNewImageEvents(db *sql.DB) { listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { if err != nil { panic(err) @@ -128,11 +178,12 @@ func ListenProcessingImageStatus(db *sql.DB, images models.ImageModel, notifier logger.Info("Update", "id", imageStringUuid, "status", status) - notification := Notification{ + notification := getImageNotification(imageNotification{ + Type: IMAGE_TYPE, ImageID: processingImage.ImageID, ImageName: processingImage.Image.ImageName, Status: status, - } + }) if err := notifier.SendAndCreate(processingImage.UserID.String(), notification); err != nil { logger.Error(err) @@ -191,6 +242,51 @@ func ListenNewStackEvents(db *sql.DB) { } } +func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier *Notifier[Notification]) { + listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { + if err != nil { + panic(err) + } + }) + defer listener.Close() + + logger := createLogger("Stack Status 📊", os.Stdout) + + if err := listener.Listen("new_processing_stack_status"); err != nil { + panic(err) + } + + for data := range listener.Notify { + stackStringUUID := data.Extra[0:36] + status := data.Extra[36:] + + stackUUID, err := uuid.Parse(stackStringUUID) + if err != nil { + logger.Error(err) + continue + } + + processingStack, err := stacks.GetToProcess(context.Background(), stackUUID) + if err != nil { + logger.Error("GetToProcess failed", "err", err) + continue + } + + logger.Info("Update", "id", stackStringUUID, "status", status) + + notification := getListNotification(listNotification{ + Type: LIST_TYPE, + Name: processingStack.Title, + ListID: stackUUID, + Status: status, + }) + + if err := notifier.SendAndCreate(processingStack.UserID.String(), notification); err != nil { + logger.Error(err) + } + } +} + /* * TODO: We have channels open every a user sends an image. * We never close these channels. @@ -251,7 +347,8 @@ func CreateEventsHandler(notifier *Notifier[Notification]) http.HandlerFunc { return } - fmt.Printf("Sending msg %s\n", msg) + fmt.Printf("Sending msg %s\n", msgString) + fmt.Fprintf(w, "event: data\ndata: %s\n\n", string(msgString)) w.(http.Flusher).Flush() } diff --git a/backend/main.go b/backend/main.go index 3503c88..4039d1d 100644 --- a/backend/main.go +++ b/backend/main.go @@ -28,6 +28,7 @@ func (client TestAiClient) GetImageInfo(imageName string, imageData []byte) (cli func setupRouter(db *sql.DB) chi.Router { imageModel := models.NewImageModel(db) + stackModel := models.NewListModel(db) stackHandler := stacks.CreateStackHandler(db) authHandler := auth.CreateAuthHandler(db) @@ -37,9 +38,15 @@ func setupRouter(db *sql.DB) chi.Router { // Only start event listeners if not in test environment if os.Getenv("GO_TEST_ENVIRONMENT") != "true" { - go ListenNewImageEvents(db, ¬ifier) + + // TODO: should extract these into a notification manager + // And actually make them the same code. + // The events are basically the same. + + go ListenNewImageEvents(db) go ListenProcessingImageStatus(db, imageModel, ¬ifier) go ListenNewStackEvents(db) + go ListenProcessingStackStatus(db, stackModel, ¬ifier) } r := chi.NewRouter() diff --git a/backend/models/lists.go b/backend/models/lists.go index 758ab9b..7519cb0 100644 --- a/backend/models/lists.go +++ b/backend/models/lists.go @@ -92,6 +92,21 @@ func (m ListModel) GetProcessing(ctx context.Context, processingListID uuid.UUID return list, err } +func (m ListModel) GetToProcess(ctx context.Context, listID uuid.UUID) (model.ProcessingLists, error) { + getToProcessStmt := ProcessingLists. + SELECT(ProcessingLists.AllColumns). + WHERE(ProcessingLists.ID.EQ(UUID(listID))) + + stack := []model.ProcessingLists{} + err := getToProcessStmt.QueryContext(ctx, m.dbPool, &stack) + + if len(stack) != 1 { + return model.ProcessingLists{}, fmt.Errorf("Expected 1, got %d\n", len(stack)) + } + + return stack[0], err +} + // ======================================== // UPDATE // ======================================== diff --git a/backend/schema.sql b/backend/schema.sql index 7c875dc..5e54fd1 100644 --- a/backend/schema.sql +++ b/backend/schema.sql @@ -124,6 +124,14 @@ PERFORM pg_notify('new_stack', NEW.id::text); END $$ LANGUAGE plpgsql; +CREATE OR REPLACE FUNCTION notify_new_processing_stack_status() +RETURNS TRIGGER AS $$ +BEGIN +PERFORM pg_notify('new_processing_stack_status', NEW.id::text || NEW.status::text); + RETURN NEW; +END +$$ LANGUAGE plpgsql; + /* -----| Triggers |----- */ CREATE OR REPLACE TRIGGER on_new_image AFTER INSERT @@ -142,4 +150,10 @@ ON haystack.processing_lists FOR EACH ROW EXECUTE PROCEDURE notify_new_stacks(); +CREATE OR REPLACE TRIGGER on_update_stack_progress +AFTER UPDATE OF status +ON haystack.processing_lists +FOR EACH ROW +EXECUTE PROCEDURE notify_new_processing_stack_status(); + /* -----| Test Data |----- */