feat: frontend responding to backend SSE and refetching images

This commit is contained in:
2025-04-26 20:32:01 +01:00
parent 78a28dee8d
commit d34805030f
13 changed files with 163 additions and 75 deletions

View File

@@ -12,7 +12,9 @@ import "github.com/go-jet/jet/v2/postgres"
var Progress = &struct {
NotStarted postgres.StringExpression
InProgress postgres.StringExpression
Complete postgres.StringExpression
}{
NotStarted: postgres.NewEnumValue("not-started"),
InProgress: postgres.NewEnumValue("in-progress"),
Complete: postgres.NewEnumValue("complete"),
}

View File

@@ -14,11 +14,13 @@ type Progress string
const (
Progress_NotStarted Progress = "not-started"
Progress_InProgress Progress = "in-progress"
Progress_Complete Progress = "complete"
)
var ProgressAllValues = []Progress{
Progress_NotStarted,
Progress_InProgress,
Progress_Complete,
}
func (e *Progress) Scan(value interface{}) error {
@@ -37,6 +39,8 @@ func (e *Progress) Scan(value interface{}) error {
*e = Progress_NotStarted
case "in-progress":
*e = Progress_InProgress
case "complete":
*e = Progress_Complete
default:
return errors.New("jet: Invalid scan value '" + enumValue + "' for Progress enum")
}

View File

@@ -68,11 +68,11 @@ func ListenNewImageEvents(db *sql.DB, eventManager *EventManager) {
orchestrator.RunAgent(image.UserID, image.ImageID, image.Image.ImageName, image.Image.Image)
_, err = imageModel.FinishProcessing(ctx, image.ID)
if err != nil {
databaseEventLog.Error("Failed to finish processing", "ImageID", imageId)
databaseEventLog.Error("Failed to finish processing", "ImageID", imageId, "error", err)
return
}
databaseEventLog.Debug("Starting processing image", "ImageID", imageId)
databaseEventLog.Debug("Finished processing image", "ImageID", imageId)
}()
}
}
@@ -119,8 +119,12 @@ func ListenProcessingImageStatus(db *sql.DB, eventManager *EventManager) {
logger.Info("Sending...")
imageListener <- status
// close(imageListener)
// delete(eventManager.listeners, stringUuid)
if status != "complete" {
continue
}
close(imageListener)
delete(eventManager.listeners, stringUuid)
}
}
}

View File

@@ -13,7 +13,6 @@ import (
"screenmark/screenmark/.gen/haystack/haystack/model"
"screenmark/screenmark/agents/client"
"screenmark/screenmark/models"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
@@ -267,6 +266,8 @@ func main() {
w.Header().Set("Connection", "keep-alive")
w.(http.Flusher).Flush()
// TODO: get the current status of the image and send it across.
ctx, cancel := context.WithCancel(r.Context())
for {
@@ -277,11 +278,18 @@ func main() {
cancel()
return
case data := <-imageNotifier:
if data == "" {
cancel()
continue
}
fmt.Printf("Status received: %s\n", data)
fmt.Fprintf(w, "event: data\ndata: %s-%s\n\n", data, time.Now().String())
fmt.Fprintf(w, "event: data\ndata: %s\n\n", data)
w.(http.Flusher).Flush()
cancel()
return
if data == "complete" {
cancel()
}
}
}
})

View File

@@ -117,6 +117,19 @@ func (m ImageModel) FinishProcessing(ctx context.Context, imageId uuid.UUID) (mo
return model.UserImages{}, err
}
// Hacky. Update the status before removing so we can get our regular triggers
// to work.
updateStatusStmt := UserImagesToProcess.
UPDATE(UserImagesToProcess.Status).
SET(model.Progress_Complete).
WHERE(UserImagesToProcess.ID.EQ(UUID(imageToProcess.ID)))
_, err = updateStatusStmt.ExecContext(ctx, tx)
if err != nil {
return model.UserImages{}, err
}
removeProcessingStmt := UserImagesToProcess.
DELETE().
WHERE(UserImagesToProcess.ID.EQ(UUID(imageToProcess.ID)))

View File

@@ -4,7 +4,7 @@ CREATE SCHEMA haystack;
/* -----| Enums |----- */
CREATE TYPE haystack.progress AS ENUM('not-started','in-progress');
CREATE TYPE haystack.progress AS ENUM('not-started','in-progress', 'complete');
/* -----| Schema tables |----- */