package processor import ( "context" "fmt" "screenmark/screenmark/.gen/haystack/haystack/model" "screenmark/screenmark/agents" "screenmark/screenmark/agents/client" "screenmark/screenmark/limits" "screenmark/screenmark/models" "screenmark/screenmark/notifications" "sync" "github.com/charmbracelet/log" ) const IMAGE_PROCESS_AT_A_TIME = 10 type ImageProcessor struct { imageModel models.ImageModel logger *log.Logger descriptionAgent agents.DescriptionAgent stackAgent client.AgentClient Processor *Processor[model.Image] notifier *notifications.Notifier[notifications.Notification] } func (p *ImageProcessor) setImageToProcess(ctx context.Context, image model.Image) { err := p.imageModel.UpdateProcess(ctx, image.ID, model.Progress_InProgress) if err != nil { // TODO: what can we actually do here for the errors? // We can't stop the work for the others p.logger.Error("failed to update image", "err", err) // TODO: we can use context here to actually pass some information through return } } func (p *ImageProcessor) setImageToDone(ctx context.Context, image model.Image) { err := p.imageModel.UpdateProcess(ctx, image.ID, model.Progress_Complete) if err != nil { // TODO: what can we actually do here for the errors? // We can't stop the work for the others p.logger.Error("failed to update image", "err", err) // TODO: we can use context here to actually pass some information through return } } func (p *ImageProcessor) describe(ctx context.Context, image model.Image) { descriptionSubLogger := p.logger.With("describe image", image.ID) err := p.descriptionAgent.Describe(descriptionSubLogger, image.ID, image.ImageName, image.Image) if err != nil { // Again, wtf do we do? // Although i think the agent actually returns an error when it's finished p.logger.Error("failed to describe image", "err", err) return } } func (p *ImageProcessor) extractInfo(ctx context.Context, image model.Image) { err := p.stackAgent.RunAgent(image.UserID, image.ID, image.ImageName, image.Image) if err != nil { // Again, wtf do we do? // Although i think the agent actually returns an error when it's finished p.logger.Error("failed to process image", "err", err) return } } func (p *ImageProcessor) processImage(image model.Image) { p.logger.Info("Processing image", "ID", image.ID) ctx := context.Background() p.setImageToProcess(ctx, image) var wg sync.WaitGroup wg.Add(2) imageNotification := notifications.GetImageNotification(notifications.ImageNotification{ Type: notifications.IMAGE_TYPE, ImageID: image.ID, ImageName: image.ImageName, Status: string(model.Progress_InProgress), }) err := p.notifier.SendAndCreate(image.UserID.String(), imageNotification) if err != nil { p.logger.Error("sending in progress notification", "err", err) return } go func() { p.describe(ctx, image) wg.Done() }() go func() { p.extractInfo(ctx, image) wg.Done() }() wg.Wait() p.setImageToDone(ctx, image) // TODO: there is some repeated code here. The ergonomicts of the notifications, // isn't the best. imageNotification = notifications.GetImageNotification(notifications.ImageNotification{ Type: notifications.IMAGE_TYPE, ImageID: image.ID, ImageName: image.ImageName, Status: string(model.Progress_Complete), }) err = p.notifier.SendAndCreate(image.UserID.String(), imageNotification) if err != nil { p.logger.Error("sending done notification", "err", err) return } } func NewImageProcessor( logger *log.Logger, imageModel models.ImageModel, listModel models.StackModel, limitsManager limits.LimitsManagerMethods, notifier *notifications.Notifier[notifications.Notification], ) (ImageProcessor, error) { if notifier == nil { return ImageProcessor{}, fmt.Errorf("notifier is nil") } descriptionAgent := agents.NewDescriptionAgent(logger, imageModel) stackAgent := agents.NewStackAgent(logger, listModel, limitsManager) imageProcessor := ImageProcessor{ imageModel: imageModel, logger: logger, descriptionAgent: descriptionAgent, stackAgent: stackAgent, notifier: notifier, } imageProcessor.Processor = NewProcessor(int(IMAGE_PROCESS_AT_A_TIME), imageProcessor.processImage) return imageProcessor, nil }