BIG MASSIVE REFACTOR OMG
Ripped out literally everything to simplify the backend as much as possible. Some of the code was so horrifically complicated it's insaneeee
This commit is contained in:
95
backend/processor/image.go
Normal file
95
backend/processor/image.go
Normal file
@@ -0,0 +1,95 @@
|
||||
package processor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"screenmark/screenmark/.gen/haystack/haystack/model"
|
||||
"screenmark/screenmark/agents"
|
||||
"screenmark/screenmark/agents/client"
|
||||
"screenmark/screenmark/models"
|
||||
"sync"
|
||||
|
||||
"github.com/charmbracelet/log"
|
||||
)
|
||||
|
||||
const IMAGE_PROCESS_AT_A_TIME = 10
|
||||
|
||||
type ImageProcessor struct {
|
||||
imageModel models.ImageModel
|
||||
logger *log.Logger
|
||||
|
||||
descriptionAgent agents.DescriptionAgent
|
||||
listAgent client.AgentClient
|
||||
|
||||
// TODO: add the notifier here
|
||||
|
||||
processor *Processor[model.Image]
|
||||
}
|
||||
|
||||
func (p *ImageProcessor) setImageToProcess(ctx context.Context, image model.Image) {
|
||||
updatedImage := model.Image{
|
||||
ID: image.ID,
|
||||
Status: model.Progress_InProgress,
|
||||
}
|
||||
|
||||
_, err := p.imageModel.Update(ctx, updatedImage)
|
||||
if err != nil {
|
||||
// TODO: what can we actually do here for the errors?
|
||||
// We can't stop the work for the others
|
||||
|
||||
p.logger.Error("failed to update image", "err", err)
|
||||
|
||||
// TODO: we can use context here to actually pass some information through
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ImageProcessor) describe(ctx context.Context, image model.Image) {
|
||||
descriptionSubLogger := p.logger.With("describe image", image.ID)
|
||||
|
||||
err := p.descriptionAgent.Describe(descriptionSubLogger, image.ID, image.ImageName, image.Image)
|
||||
if err != nil {
|
||||
// Again, wtf do we do?
|
||||
// Although i think the agent actually returns an error when it's finished
|
||||
p.logger.Error("failed to describe image", "err", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ImageProcessor) extractInfo(ctx context.Context, image model.Image) {
|
||||
err := p.listAgent.RunAgent(*image.UserID, image.ID, image.ImageName, image.Image)
|
||||
if err != nil {
|
||||
// Again, wtf do we do?
|
||||
// Although i think the agent actually returns an error when it's finished
|
||||
p.logger.Error("failed to process image", "err", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ImageProcessor) processImage(image model.Image) {
|
||||
ctx := context.Background()
|
||||
|
||||
p.setImageToProcess(ctx, image)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
|
||||
go func() {
|
||||
p.describe(ctx, image)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
p.extractInfo(ctx, image)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func CreateImageProcessor(logger *log.Logger, imageModel models.ImageModel) ImageProcessor {
|
||||
imageProcessor := ImageProcessor{imageModel: imageModel, logger: logger}
|
||||
|
||||
imageProcessor.processor = NewProcessor(int(IMAGE_PROCESS_AT_A_TIME), imageProcessor.processImage)
|
||||
|
||||
return imageProcessor
|
||||
}
|
||||
19
backend/processor/processor.go
Normal file
19
backend/processor/processor.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package processor
|
||||
|
||||
type Processor[TMessage any] struct {
|
||||
queue chan TMessage
|
||||
process func(message TMessage)
|
||||
}
|
||||
|
||||
func (p *Processor[TMessage]) Work() {
|
||||
for msg := range p.queue {
|
||||
p.process(msg)
|
||||
}
|
||||
}
|
||||
|
||||
func NewProcessor[TMessage any](bufferSize int, process func(message TMessage)) *Processor[TMessage] {
|
||||
return &Processor[TMessage]{
|
||||
queue: make(chan TMessage, bufferSize),
|
||||
process: process,
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user