From 391d0fdde2b69d067e30c2d3011046aab8b929c9 Mon Sep 17 00:00:00 2001 From: John Costa Date: Sat, 12 Apr 2025 15:55:58 +0100 Subject: [PATCH] FIXUP wip: notifications on starting progress --- .../.gen/haystack/haystack/enum/progress.go | 18 +++++++ .../.gen/haystack/haystack/model/progress.go | 49 +++++++++++++++++++ .../haystack/model/user_images_to_process.go | 1 + .../haystack/table/user_images_to_process.go | 7 ++- backend/events.go | 27 +++++++++- backend/main.go | 6 +++ backend/models/image.go | 8 +++ backend/schema.sql | 19 +++++++ 8 files changed, 131 insertions(+), 4 deletions(-) create mode 100644 backend/.gen/haystack/haystack/enum/progress.go create mode 100644 backend/.gen/haystack/haystack/model/progress.go diff --git a/backend/.gen/haystack/haystack/enum/progress.go b/backend/.gen/haystack/haystack/enum/progress.go new file mode 100644 index 0000000..1509f3f --- /dev/null +++ b/backend/.gen/haystack/haystack/enum/progress.go @@ -0,0 +1,18 @@ +// +// Code generated by go-jet DO NOT EDIT. +// +// WARNING: Changes to this file may cause incorrect behavior +// and will be lost if the code is regenerated +// + +package enum + +import "github.com/go-jet/jet/v2/postgres" + +var Progress = &struct { + NotStarted postgres.StringExpression + InProgress postgres.StringExpression +}{ + NotStarted: postgres.NewEnumValue("not-started"), + InProgress: postgres.NewEnumValue("in-progress"), +} diff --git a/backend/.gen/haystack/haystack/model/progress.go b/backend/.gen/haystack/haystack/model/progress.go new file mode 100644 index 0000000..968b5c0 --- /dev/null +++ b/backend/.gen/haystack/haystack/model/progress.go @@ -0,0 +1,49 @@ +// +// Code generated by go-jet DO NOT EDIT. +// +// WARNING: Changes to this file may cause incorrect behavior +// and will be lost if the code is regenerated +// + +package model + +import "errors" + +type Progress string + +const ( + Progress_NotStarted Progress = "not-started" + Progress_InProgress Progress = "in-progress" +) + +var ProgressAllValues = []Progress{ + Progress_NotStarted, + Progress_InProgress, +} + +func (e *Progress) Scan(value interface{}) error { + var enumValue string + switch val := value.(type) { + case string: + enumValue = val + case []byte: + enumValue = string(val) + default: + return errors.New("jet: Invalid scan value for AllTypesEnum enum. Enum value has to be of type string or []byte") + } + + switch enumValue { + case "not-started": + *e = Progress_NotStarted + case "in-progress": + *e = Progress_InProgress + default: + return errors.New("jet: Invalid scan value '" + enumValue + "' for Progress enum") + } + + return nil +} + +func (e Progress) String() string { + return string(e) +} diff --git a/backend/.gen/haystack/haystack/model/user_images_to_process.go b/backend/.gen/haystack/haystack/model/user_images_to_process.go index 8b8ee4c..323c803 100644 --- a/backend/.gen/haystack/haystack/model/user_images_to_process.go +++ b/backend/.gen/haystack/haystack/model/user_images_to_process.go @@ -13,6 +13,7 @@ import ( type UserImagesToProcess struct { ID uuid.UUID `sql:"primary_key"` + Status Progress ImageID uuid.UUID UserID uuid.UUID } diff --git a/backend/.gen/haystack/haystack/table/user_images_to_process.go b/backend/.gen/haystack/haystack/table/user_images_to_process.go index b3df555..508ca60 100644 --- a/backend/.gen/haystack/haystack/table/user_images_to_process.go +++ b/backend/.gen/haystack/haystack/table/user_images_to_process.go @@ -18,6 +18,7 @@ type userImagesToProcessTable struct { // Columns ID postgres.ColumnString + Status postgres.ColumnString ImageID postgres.ColumnString UserID postgres.ColumnString @@ -61,10 +62,11 @@ func newUserImagesToProcessTable(schemaName, tableName, alias string) *UserImage func newUserImagesToProcessTableImpl(schemaName, tableName, alias string) userImagesToProcessTable { var ( IDColumn = postgres.StringColumn("id") + StatusColumn = postgres.StringColumn("status") ImageIDColumn = postgres.StringColumn("image_id") UserIDColumn = postgres.StringColumn("user_id") - allColumns = postgres.ColumnList{IDColumn, ImageIDColumn, UserIDColumn} - mutableColumns = postgres.ColumnList{ImageIDColumn, UserIDColumn} + allColumns = postgres.ColumnList{IDColumn, StatusColumn, ImageIDColumn, UserIDColumn} + mutableColumns = postgres.ColumnList{StatusColumn, ImageIDColumn, UserIDColumn} ) return userImagesToProcessTable{ @@ -72,6 +74,7 @@ func newUserImagesToProcessTableImpl(schemaName, tableName, alias string) userIm //Columns ID: IDColumn, + Status: StatusColumn, ImageID: ImageIDColumn, UserID: UserIDColumn, diff --git a/backend/events.go b/backend/events.go index 47821f2..1b41ec6 100644 --- a/backend/events.go +++ b/backend/events.go @@ -3,6 +3,7 @@ package main import ( "context" "database/sql" + "fmt" "log" "os" "screenmark/screenmark/agents" @@ -62,8 +63,8 @@ func ListenNewImageEvents(db *sql.DB) { return } - _, err = imageModel.FinishProcessing(ctx, image.ID) - if err != nil { + + if err := imageModel.StartProcessing(ctx, image.ID); err != nil { log.Println("Failed to FinishProcessing") log.Println(err) return @@ -80,7 +81,29 @@ func ListenNewImageEvents(db *sql.DB) { if err != nil { log.Println(err) } + + imageModel.FinishProcessing(ctx, image.ID) }() } } } + +func ListenProcessingImageStatus(db *sql.DB) { + listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { + if err != nil { + panic(err) + } + }) + defer listener.Close() + + if err := listener.Listen("new_processing_image_status"); err != nil { + panic(err) + } + + for { + select { + case data := <- listener.Notify + fmt.Println(data) + } + } +} diff --git a/backend/main.go b/backend/main.go index 032434a..757f7a2 100644 --- a/backend/main.go +++ b/backend/main.go @@ -49,6 +49,7 @@ func main() { auth := CreateAuth(mail) go ListenNewImageEvents(db) + go ListenProcessingImageStatus(db) r := chi.NewRouter() @@ -244,6 +245,11 @@ func main() { w.Header().Add("Content-Type", "application/json") }) + r.Get("/image-events/{id}", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + }) }) r.Post("/login", func(w http.ResponseWriter, r *http.Request) { diff --git a/backend/models/image.go b/backend/models/image.go index a0c29f7..9b76249 100644 --- a/backend/models/image.go +++ b/backend/models/image.go @@ -130,6 +130,14 @@ func (m ImageModel) FinishProcessing(ctx context.Context, imageId uuid.UUID) (mo return userImage, err } +func (m ImageModel) StartProcessing(ctx context.Context, processingImageId uuid.UUID) error { + startProcessingStmt := UserImagesToProcess.UPDATE(UserImagesToProcess.Status).SET(model.Progress_InProgress) + + _, err := startProcessingStmt.ExecContext(ctx, m.dbPool) + + return err +} + func (m ImageModel) Get(ctx context.Context, imageId uuid.UUID) (ImageData, error) { getImageStmt := SELECT(UserImages.AllColumns, Image.AllColumns). FROM( diff --git a/backend/schema.sql b/backend/schema.sql index 1f21291..673674e 100644 --- a/backend/schema.sql +++ b/backend/schema.sql @@ -2,6 +2,10 @@ DROP SCHEMA IF EXISTS haystack CASCADE; CREATE SCHEMA haystack; +/* -----| Enums |----- */ + +CREATE TYPE haystack.progress AS ENUM('not-started','in-progress'); + /* -----| Schema tables |----- */ CREATE TABLE haystack.users ( @@ -17,6 +21,7 @@ CREATE TABLE haystack.image ( CREATE TABLE haystack.user_images_to_process ( id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + status haystack.progress NOT NULL DEFAULT 'not-started', image_id uuid NOT NULL UNIQUE REFERENCES haystack.image (id), user_id uuid NOT NULL REFERENCES haystack.users (id) ); @@ -155,6 +160,14 @@ BEGIN END $$ LANGUAGE plpgsql; +CREATE OR REPLACE FUNCTION notify_new_processing_image_status() +RETURNS TRIGGER AS $$ +BEGIN + PERFORM pg_notify('new_processing_image_status', NEW.status::text); + RETURN NEW; +END +$$ LANGUAGE plpgsql; + /* -----| Triggers |----- */ CREATE OR REPLACE TRIGGER on_new_image AFTER INSERT @@ -162,6 +175,12 @@ ON haystack.user_images_to_process FOR EACH ROW EXECUTE PROCEDURE notify_new_image(); +CREATE OR REPLACE TRIGGER on_update_image_progress +AFTER UPDATE OF status +ON haystack.user_images_to_process +FOR EACH ROW +EXECUTE PROCEDURE notify_new_processing_image_status(); + /* -----| Test Data |----- */ -- Insert a user