diff --git a/backend/events.go b/backend/events.go index c7a93e3..c3cf95e 100644 --- a/backend/events.go +++ b/backend/events.go @@ -44,45 +44,42 @@ func ListenNewImageEvents(db *sql.DB, notifier *Notifier[Notification]) { panic(err) } - for { - select { - case parameters := <-listener.Notify: - imageId := uuid.MustParse(parameters.Extra) + for parameters := range listener.Notify { + imageId := uuid.MustParse(parameters.Extra) - databaseEventLog.Debug("Starting processing image", "ImageID", imageId) + databaseEventLog.Debug("Starting processing image", "ImageID", imageId) - ctx := context.Background() + ctx := context.Background() - go func() { - image, err := imageModel.GetToProcessWithData(ctx, imageId) - if err != nil { - databaseEventLog.Error("Failed to GetToProcessWithData", "error", err) - return - } + go func() { + image, err := imageModel.GetToProcessWithData(ctx, imageId) + if err != nil { + databaseEventLog.Error("Failed to GetToProcessWithData", "error", err) + return + } - splitWriter := createDbStdoutWriter(db, image.ImageID) + splitWriter := createDbStdoutWriter(db, image.ImageID) - noteAgent := agents.NewNoteAgent(createLogger("Notes 📝", splitWriter), noteModel) - contactAgent := agents.NewContactAgent(createLogger("Contacts 👥", splitWriter), contactModel) - locationAgent := agents.NewLocationAgent(createLogger("Locations 📍", splitWriter), locationModel) - eventAgent := agents.NewEventAgent(createLogger("Events 📅", splitWriter), eventModel, locationModel) + noteAgent := agents.NewNoteAgent(createLogger("Notes 📝", splitWriter), noteModel) + contactAgent := agents.NewContactAgent(createLogger("Contacts 👥", splitWriter), contactModel) + locationAgent := agents.NewLocationAgent(createLogger("Locations 📍", splitWriter), locationModel) + eventAgent := agents.NewEventAgent(createLogger("Events 📅", splitWriter), eventModel, locationModel) - if err := imageModel.StartProcessing(ctx, image.ID); err != nil { - databaseEventLog.Error("Failed to FinishProcessing", "error", err) - return - } + if err := imageModel.StartProcessing(ctx, image.ID); err != nil { + databaseEventLog.Error("Failed to FinishProcessing", "error", err) + return + } - orchestrator := agents.NewOrchestratorAgent(createLogger("Orchestrator 🎼", splitWriter), noteAgent, contactAgent, locationAgent, eventAgent, image.Image.ImageName, image.Image.Image) - orchestrator.RunAgent(image.UserID, image.ImageID, image.Image.ImageName, image.Image.Image) - _, err = imageModel.FinishProcessing(ctx, image.ID) - if err != nil { - databaseEventLog.Error("Failed to finish processing", "ImageID", imageId, "error", err) - return - } + orchestrator := agents.NewOrchestratorAgent(createLogger("Orchestrator 🎼", splitWriter), noteAgent, contactAgent, locationAgent, eventAgent, image.Image.ImageName, image.Image.Image) + orchestrator.RunAgent(image.UserID, image.ImageID, image.Image.ImageName, image.Image.Image) + _, err = imageModel.FinishProcessing(ctx, image.ID) + if err != nil { + databaseEventLog.Error("Failed to finish processing", "ImageID", imageId, "error", err) + return + } - databaseEventLog.Debug("Finished processing image", "ImageID", imageId) - }() - } + databaseEventLog.Debug("Finished processing image", "ImageID", imageId) + }() } } @@ -100,69 +97,74 @@ func ListenProcessingImageStatus(db *sql.DB, images models.ImageModel, notifier panic(err) } - for { - select { - case data := <-listener.Notify: - imageStringUuid := data.Extra[0:36] - status := data.Extra[36:] + for data := range listener.Notify { + imageStringUuid := data.Extra[0:36] + status := data.Extra[36:] - imageUuid, err := uuid.Parse(imageStringUuid) - if err != nil { - logger.Error(err) - continue - } + imageUuid, err := uuid.Parse(imageStringUuid) + if err != nil { + logger.Error(err) + continue + } - processingImage, err := images.GetToProcess(context.Background(), imageUuid) - if err != nil { - logger.Error("GetToProcess failed", "err", err) - continue - } + processingImage, err := images.GetToProcess(context.Background(), imageUuid) + if err != nil { + logger.Error("GetToProcess failed", "err", err) + continue + } - logger.Info("Update", "id", imageStringUuid, "status", status) + logger.Info("Update", "id", imageStringUuid, "status", status) - notification := Notification{ - ImageID: processingImage.ImageID, - Status: status, - } + notification := Notification{ + ImageID: processingImage.ImageID, + Status: status, + } - if err := notifier.SendAndCreate(processingImage.UserID.String(), notification); err != nil { - logger.Error(err) - } + if err := notifier.SendAndCreate(processingImage.UserID.String(), notification); err != nil { + logger.Error(err) } } } +/* + * TODO: We have channels open every a user sends an image. + * We never close these channels. + * + * What is a reasonable default? Close the channel after 1 minute of inactivity? + */ func CreateEventsHandler(notifier *Notifier[Notification]) http.HandlerFunc { counter := 0 userSplitters := make(map[string]*ChannelSplitter[Notification]) return func(w http.ResponseWriter, r *http.Request) { - userId := r.Context().Value(USER_ID).(uuid.UUID) - if userId == uuid.Nil { + _userId := r.Context().Value(USER_ID).(uuid.UUID) + if _userId == uuid.Nil { w.WriteHeader(http.StatusUnauthorized) return } + userId := _userId.String() + w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") // w.(http.Flusher).Flush() - if _, exists := notifier.Listeners[userId.String()]; !exists { - notifier.Create(userId.String()) + if _, exists := notifier.Listeners[userId]; !exists { + notifier.Create(userId) } - userNotifications := notifier.Listeners[userId.String()] + userNotifications := notifier.Listeners[userId] - if _, exists := userSplitters[userId.String()]; !exists { + if _, exists := userSplitters[userId]; !exists { splitter := NewChannelSplitter(userNotifications) - userSplitters[userId.String()] = &splitter + userSplitters[userId] = &splitter splitter.Listen() } - splitter := userSplitters[userId.String()] + splitter := userSplitters[userId] id := strconv.Itoa(counter) counter += 1 diff --git a/backend/main.go b/backend/main.go index 8df4f71..302a4d5 100644 --- a/backend/main.go +++ b/backend/main.go @@ -135,14 +135,24 @@ func main() { return } + processingImages, err := imageModel.GetProcessing(r.Context(), userId) + if err != nil { + log.Println(err) + w.WriteHeader(http.StatusNotFound) + fmt.Fprintf(w, "Something went wrong") + return + } + type ImagesReturn struct { - UserImages []models.UserImageWithImage - ImageProperties []models.TypedProperties + UserImages []models.UserImageWithImage + ImageProperties []models.TypedProperties + ProcessingImages []models.UserProcessingImage } imagesReturn := ImagesReturn{ - UserImages: images, - ImageProperties: models.GetTypedImageProperties(imageProperties), + UserImages: images, + ImageProperties: models.GetTypedImageProperties(imageProperties), + ProcessingImages: processingImages, } jsonImages, err := json.Marshal(imagesReturn) diff --git a/backend/models/image.go b/backend/models/image.go index 1406ba2..fd02b23 100644 --- a/backend/models/image.go +++ b/backend/models/image.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "screenmark/screenmark/.gen/haystack/haystack/enum" "screenmark/screenmark/.gen/haystack/haystack/model" . "screenmark/screenmark/.gen/haystack/haystack/table" @@ -29,6 +30,12 @@ type ProcessingImageData struct { Image model.Image } +type UserProcessingImage struct { + model.UserImagesToProcess + + Image model.Image +} + func (m ImageModel) Process(ctx context.Context, userId uuid.UUID, image model.Image) (model.UserImagesToProcess, error) { tx, err := m.dbPool.BeginTx(ctx, nil) if err != nil { @@ -89,7 +96,7 @@ func (m ImageModel) GetToProcessWithData(ctx context.Context, imageId uuid.UUID) err := stmt.QueryContext(ctx, m.dbPool, &images) if len(images) != 1 { - return ProcessingImageData{}, errors.New(fmt.Sprintf("Expected 1, got %d\n", len(images))) + return ProcessingImageData{}, fmt.Errorf("Expected 1, got %d\n", len(images)) } return images[0], err @@ -171,6 +178,23 @@ func (m ImageModel) Get(ctx context.Context, imageId uuid.UUID) (model.Image, er return image, err } +func (m ImageModel) GetProcessing(ctx context.Context, userId uuid.UUID) ([]UserProcessingImage, error) { + getProcessingStmt := SELECT(UserImagesToProcess.AllColumns, Image.ID, Image.ImageName). + FROM( + UserImagesToProcess.INNER_JOIN( + Image, Image.ID.EQ(UserImagesToProcess.ImageID), + ), + ).WHERE( + UserImagesToProcess.UserID.EQ(UUID(userId)). + AND(UserImagesToProcess.Status.NOT_EQ(enum.Progress.Complete)), + ) + + images := []UserProcessingImage{} + err := getProcessingStmt.QueryContext(ctx, m.dbPool, &images) + + return images, err +} + func (m ImageModel) IsUserAuthorized(ctx context.Context, imageId uuid.UUID, userId uuid.UUID) bool { getImageUserId := UserImages.SELECT(UserImages.UserID).WHERE(UserImages.ImageID.EQ(UUID(imageId)))