From 0fcdd73a47ded19b9d52e687505315057ae66f39 Mon Sep 17 00:00:00 2001 From: John Costa Date: Sat, 12 Apr 2025 15:55:58 +0100 Subject: [PATCH] feat(sse): very rough events. Not used in the client yet feat(sse): very rough events. Not used in the client yet --- .../.gen/haystack/haystack/enum/progress.go | 18 +++++++ .../.gen/haystack/haystack/model/progress.go | 49 +++++++++++++++++ .../haystack/model/user_images_to_process.go | 1 + .../haystack/table/user_images_to_process.go | 7 ++- backend/agents/client/client.go | 1 + backend/events.go | 54 +++++++++++++++++-- backend/main.go | 42 ++++++++++++++- backend/models/image.go | 11 ++++ backend/schema.sql | 19 +++++++ frontend/src/network/index.ts | 1 + 10 files changed, 197 insertions(+), 6 deletions(-) create mode 100644 backend/.gen/haystack/haystack/enum/progress.go create mode 100644 backend/.gen/haystack/haystack/model/progress.go diff --git a/backend/.gen/haystack/haystack/enum/progress.go b/backend/.gen/haystack/haystack/enum/progress.go new file mode 100644 index 0000000..1509f3f --- /dev/null +++ b/backend/.gen/haystack/haystack/enum/progress.go @@ -0,0 +1,18 @@ +// +// Code generated by go-jet DO NOT EDIT. +// +// WARNING: Changes to this file may cause incorrect behavior +// and will be lost if the code is regenerated +// + +package enum + +import "github.com/go-jet/jet/v2/postgres" + +var Progress = &struct { + NotStarted postgres.StringExpression + InProgress postgres.StringExpression +}{ + NotStarted: postgres.NewEnumValue("not-started"), + InProgress: postgres.NewEnumValue("in-progress"), +} diff --git a/backend/.gen/haystack/haystack/model/progress.go b/backend/.gen/haystack/haystack/model/progress.go new file mode 100644 index 0000000..968b5c0 --- /dev/null +++ b/backend/.gen/haystack/haystack/model/progress.go @@ -0,0 +1,49 @@ +// +// Code generated by go-jet DO NOT EDIT. +// +// WARNING: Changes to this file may cause incorrect behavior +// and will be lost if the code is regenerated +// + +package model + +import "errors" + +type Progress string + +const ( + Progress_NotStarted Progress = "not-started" + Progress_InProgress Progress = "in-progress" +) + +var ProgressAllValues = []Progress{ + Progress_NotStarted, + Progress_InProgress, +} + +func (e *Progress) Scan(value interface{}) error { + var enumValue string + switch val := value.(type) { + case string: + enumValue = val + case []byte: + enumValue = string(val) + default: + return errors.New("jet: Invalid scan value for AllTypesEnum enum. Enum value has to be of type string or []byte") + } + + switch enumValue { + case "not-started": + *e = Progress_NotStarted + case "in-progress": + *e = Progress_InProgress + default: + return errors.New("jet: Invalid scan value '" + enumValue + "' for Progress enum") + } + + return nil +} + +func (e Progress) String() string { + return string(e) +} diff --git a/backend/.gen/haystack/haystack/model/user_images_to_process.go b/backend/.gen/haystack/haystack/model/user_images_to_process.go index 8b8ee4c..323c803 100644 --- a/backend/.gen/haystack/haystack/model/user_images_to_process.go +++ b/backend/.gen/haystack/haystack/model/user_images_to_process.go @@ -13,6 +13,7 @@ import ( type UserImagesToProcess struct { ID uuid.UUID `sql:"primary_key"` + Status Progress ImageID uuid.UUID UserID uuid.UUID } diff --git a/backend/.gen/haystack/haystack/table/user_images_to_process.go b/backend/.gen/haystack/haystack/table/user_images_to_process.go index b3df555..508ca60 100644 --- a/backend/.gen/haystack/haystack/table/user_images_to_process.go +++ b/backend/.gen/haystack/haystack/table/user_images_to_process.go @@ -18,6 +18,7 @@ type userImagesToProcessTable struct { // Columns ID postgres.ColumnString + Status postgres.ColumnString ImageID postgres.ColumnString UserID postgres.ColumnString @@ -61,10 +62,11 @@ func newUserImagesToProcessTable(schemaName, tableName, alias string) *UserImage func newUserImagesToProcessTableImpl(schemaName, tableName, alias string) userImagesToProcessTable { var ( IDColumn = postgres.StringColumn("id") + StatusColumn = postgres.StringColumn("status") ImageIDColumn = postgres.StringColumn("image_id") UserIDColumn = postgres.StringColumn("user_id") - allColumns = postgres.ColumnList{IDColumn, ImageIDColumn, UserIDColumn} - mutableColumns = postgres.ColumnList{ImageIDColumn, UserIDColumn} + allColumns = postgres.ColumnList{IDColumn, StatusColumn, ImageIDColumn, UserIDColumn} + mutableColumns = postgres.ColumnList{StatusColumn, ImageIDColumn, UserIDColumn} ) return userImagesToProcessTable{ @@ -72,6 +74,7 @@ func newUserImagesToProcessTableImpl(schemaName, tableName, alias string) userIm //Columns ID: IDColumn, + Status: StatusColumn, ImageID: ImageIDColumn, UserID: UserIDColumn, diff --git a/backend/agents/client/client.go b/backend/agents/client/client.go index 3e540c7..244d539 100644 --- a/backend/agents/client/client.go +++ b/backend/agents/client/client.go @@ -142,6 +142,7 @@ func (client AgentClient) Request(req *AgentRequestBody) (AgentResponse, error) } if len(agentResponse.Choices) != 1 { + client.Log.Errorf("Received more than 1 choice from AI \n %s\n", string(response)) return AgentResponse{}, errors.New("Unsupported. We currently only accept 1 choice from AI.") } diff --git a/backend/events.go b/backend/events.go index 47821f2..39edff8 100644 --- a/backend/events.go +++ b/backend/events.go @@ -3,6 +3,7 @@ package main import ( "context" "database/sql" + "fmt" "log" "os" "screenmark/screenmark/agents" @@ -13,7 +14,7 @@ import ( "github.com/lib/pq" ) -func ListenNewImageEvents(db *sql.DB) { +func ListenNewImageEvents(db *sql.DB, eventManager *EventManager) { listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { if err != nil { panic(err) @@ -36,6 +37,7 @@ func ListenNewImageEvents(db *sql.DB) { select { case parameters := <-listener.Notify: imageId := uuid.MustParse(parameters.Extra) + eventManager.listeners[parameters.Extra] = make(chan string) ctx := context.Background() @@ -62,8 +64,7 @@ func ListenNewImageEvents(db *sql.DB) { return } - _, err = imageModel.FinishProcessing(ctx, image.ID) - if err != nil { + if err := imageModel.StartProcessing(ctx, image.ID); err != nil { log.Println("Failed to FinishProcessing") log.Println(err) return @@ -80,7 +81,54 @@ func ListenNewImageEvents(db *sql.DB) { if err != nil { log.Println(err) } + + imageModel.FinishProcessing(ctx, image.ID) }() } } } + +type EventManager struct { + // Maps processing image UUID to a channel + listeners map[string]chan string +} + +func NewEventManager() EventManager { + return EventManager{ + listeners: make(map[string]chan string), + } +} + +func ListenProcessingImageStatus(db *sql.DB, eventManager *EventManager) { + listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { + if err != nil { + panic(err) + } + }) + defer listener.Close() + + if err := listener.Listen("new_processing_image_status"); err != nil { + panic(err) + } + + for { + select { + case data := <-listener.Notify: + stringUuid := data.Extra[0:36] + status := data.Extra[36:] + + fmt.Printf("UUID: %s\n", stringUuid) + fmt.Printf("Receiving :s\n", data.Extra) + + imageListener, exists := eventManager.listeners[stringUuid] + if !exists { + continue + } + + imageListener <- status + + close(imageListener) + delete(eventManager.listeners, stringUuid) + } + } +} diff --git a/backend/main.go b/backend/main.go index 032434a..863d968 100644 --- a/backend/main.go +++ b/backend/main.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "context" "encoding/base64" "encoding/json" "fmt" @@ -12,6 +13,7 @@ import ( "screenmark/screenmark/.gen/haystack/haystack/model" "screenmark/screenmark/agents/client" "screenmark/screenmark/models" + "time" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" @@ -48,7 +50,10 @@ func main() { auth := CreateAuth(mail) - go ListenNewImageEvents(db) + eventManager := NewEventManager() + + go ListenNewImageEvents(db, &eventManager) + go ListenProcessingImageStatus(db, &eventManager) r := chi.NewRouter() @@ -246,6 +251,41 @@ func main() { }) + r.Get("/image-events/{id}", func(w http.ResponseWriter, r *http.Request) { + // TODO: authentication :) + + id := r.PathValue("id") + + imageNotifier, exists := eventManager.listeners[id] + if !exists { + fmt.Println("Not found!") + w.WriteHeader(http.StatusNotFound) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.(http.Flusher).Flush() + + ctx, cancel := context.WithCancel(r.Context()) + + for { + select { + case <-ctx.Done(): + fmt.Fprint(w, "event: close\ndata: Connection closed\n\n") + w.(http.Flusher).Flush() + cancel() + return + case data := <-imageNotifier: + fmt.Printf("Status received: %s\n", data) + fmt.Fprintf(w, "data: %s-%s\n", data, time.Now().String()) + w.(http.Flusher).Flush() + cancel() + } + } + }) + r.Post("/login", func(w http.ResponseWriter, r *http.Request) { type LoginBody struct { Email string `json:"email"` diff --git a/backend/models/image.go b/backend/models/image.go index a0c29f7..e5abe31 100644 --- a/backend/models/image.go +++ b/backend/models/image.go @@ -130,6 +130,17 @@ func (m ImageModel) FinishProcessing(ctx context.Context, imageId uuid.UUID) (mo return userImage, err } +func (m ImageModel) StartProcessing(ctx context.Context, processingImageId uuid.UUID) error { + startProcessingStmt := UserImagesToProcess. + UPDATE(UserImagesToProcess.Status). + SET(model.Progress_InProgress). + WHERE(UserImagesToProcess.ID.EQ(UUID(processingImageId))) + + _, err := startProcessingStmt.ExecContext(ctx, m.dbPool) + + return err +} + func (m ImageModel) Get(ctx context.Context, imageId uuid.UUID) (ImageData, error) { getImageStmt := SELECT(UserImages.AllColumns, Image.AllColumns). FROM( diff --git a/backend/schema.sql b/backend/schema.sql index 1f21291..2163eda 100644 --- a/backend/schema.sql +++ b/backend/schema.sql @@ -2,6 +2,10 @@ DROP SCHEMA IF EXISTS haystack CASCADE; CREATE SCHEMA haystack; +/* -----| Enums |----- */ + +CREATE TYPE haystack.progress AS ENUM('not-started','in-progress'); + /* -----| Schema tables |----- */ CREATE TABLE haystack.users ( @@ -17,6 +21,7 @@ CREATE TABLE haystack.image ( CREATE TABLE haystack.user_images_to_process ( id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + status haystack.progress NOT NULL DEFAULT 'not-started', image_id uuid NOT NULL UNIQUE REFERENCES haystack.image (id), user_id uuid NOT NULL REFERENCES haystack.users (id) ); @@ -155,6 +160,14 @@ BEGIN END $$ LANGUAGE plpgsql; +CREATE OR REPLACE FUNCTION notify_new_processing_image_status() +RETURNS TRIGGER AS $$ +BEGIN +PERFORM pg_notify('new_processing_image_status', NEW.id::text || NEW.status::text); + RETURN NEW; +END +$$ LANGUAGE plpgsql; + /* -----| Triggers |----- */ CREATE OR REPLACE TRIGGER on_new_image AFTER INSERT @@ -162,6 +175,12 @@ ON haystack.user_images_to_process FOR EACH ROW EXECUTE PROCEDURE notify_new_image(); +CREATE OR REPLACE TRIGGER on_update_image_progress +AFTER UPDATE OF status +ON haystack.user_images_to_process +FOR EACH ROW +EXECUTE PROCEDURE notify_new_processing_image_status(); + /* -----| Test Data |----- */ -- Insert a user diff --git a/frontend/src/network/index.ts b/frontend/src/network/index.ts index 1fd94b5..067f3e7 100644 --- a/frontend/src/network/index.ts +++ b/frontend/src/network/index.ts @@ -41,6 +41,7 @@ const sendImageResponseValidator = strictObject({ ID: pipe(string(), uuid()), ImageID: pipe(string(), uuid()), UserID: pipe(string(), uuid()), + Status: string(), }); export const sendImage = async (