wip: processing images
This commit is contained in:
@ -9,9 +9,11 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"screenmark/screenmark/.gen/haystack/haystack/model"
|
||||||
"screenmark/screenmark/limits"
|
"screenmark/screenmark/limits"
|
||||||
"screenmark/screenmark/middleware"
|
"screenmark/screenmark/middleware"
|
||||||
"screenmark/screenmark/models"
|
"screenmark/screenmark/models"
|
||||||
|
"screenmark/screenmark/processor"
|
||||||
|
|
||||||
"github.com/charmbracelet/log"
|
"github.com/charmbracelet/log"
|
||||||
"github.com/go-chi/chi/v5"
|
"github.com/go-chi/chi/v5"
|
||||||
@ -27,6 +29,8 @@ type ImageHandler struct {
|
|||||||
limitsManager limits.LimitsManagerMethods
|
limitsManager limits.LimitsManagerMethods
|
||||||
|
|
||||||
jwtManager *middleware.JwtManager
|
jwtManager *middleware.JwtManager
|
||||||
|
|
||||||
|
processor *processor.Processor[model.Image]
|
||||||
}
|
}
|
||||||
|
|
||||||
type ImagesReturn struct {
|
type ImagesReturn struct {
|
||||||
@ -139,16 +143,17 @@ func (h *ImageHandler) uploadImage(w http.ResponseWriter, r *http.Request) {
|
|||||||
middleware.WriteErrorBadRequest(h.logger, "unsupported content type, need octet-stream or base64", w)
|
middleware.WriteErrorBadRequest(h.logger, "unsupported content type, need octet-stream or base64", w)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
|
|
||||||
err = h.imageModel.Save(ctx, imageName, image, userID)
|
newImage, err := h.imageModel.Save(ctx, imageName, image, userID)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
middleware.WriteErrorInternal(h.logger, "could not save image to DB", w)
|
middleware.WriteErrorInternal(h.logger, "could not save image to DB: "+err.Error(), w)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
h.logger.Info("About to add image")
|
||||||
|
h.processor.Add(newImage)
|
||||||
|
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -202,7 +207,12 @@ func (h *ImageHandler) CreateRoutes(r chi.Router) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreateImageHandler(db *sql.DB, limitsManager limits.LimitsManagerMethods, jwtManager *middleware.JwtManager) ImageHandler {
|
func CreateImageHandler(
|
||||||
|
db *sql.DB,
|
||||||
|
limitsManager limits.LimitsManagerMethods,
|
||||||
|
jwtManager *middleware.JwtManager,
|
||||||
|
processor *processor.Processor[model.Image],
|
||||||
|
) ImageHandler {
|
||||||
imageModel := models.NewImageModel(db)
|
imageModel := models.NewImageModel(db)
|
||||||
userModel := models.NewUserModel(db)
|
userModel := models.NewUserModel(db)
|
||||||
logger := log.New(os.Stdout).WithPrefix("Images")
|
logger := log.New(os.Stdout).WithPrefix("Images")
|
||||||
@ -213,5 +223,6 @@ func CreateImageHandler(db *sql.DB, limitsManager limits.LimitsManagerMethods, j
|
|||||||
userModel: userModel,
|
userModel: userModel,
|
||||||
limitsManager: limitsManager,
|
limitsManager: limitsManager,
|
||||||
jwtManager: jwtManager,
|
jwtManager: jwtManager,
|
||||||
|
processor: processor,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -35,6 +35,7 @@ func writeError(logger *log.Logger, error string, w http.ResponseWriter, code in
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.Error("writing error", "error", error)
|
||||||
w.Write(jsonObject)
|
w.Write(jsonObject)
|
||||||
w.WriteHeader(code)
|
w.WriteHeader(code)
|
||||||
}
|
}
|
||||||
|
@ -17,13 +17,15 @@ type ImageModel struct {
|
|||||||
dbPool *sql.DB
|
dbPool *sql.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m ImageModel) Save(ctx context.Context, name string, image []byte, userID uuid.UUID) error {
|
func (m ImageModel) Save(ctx context.Context, name string, image []byte, userID uuid.UUID) (model.Image, error) {
|
||||||
saveImageStmt := Image.INSERT(Image.ImageName, Image.Image, Image.UserID).
|
saveImageStmt := Image.INSERT(Image.ImageName, Image.Image, Image.Description, Image.UserID).
|
||||||
VALUES(name, image, userID)
|
VALUES(name, image, "", userID).
|
||||||
|
RETURNING(Image.AllColumns)
|
||||||
|
|
||||||
_, err := saveImageStmt.ExecContext(ctx, m.dbPool)
|
newImage := model.Image{}
|
||||||
|
err := saveImageStmt.QueryContext(ctx, m.dbPool, &newImage)
|
||||||
|
|
||||||
return err
|
return newImage, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m ImageModel) Get(ctx context.Context, imageID uuid.UUID) (model.Image, bool, error) {
|
func (m ImageModel) Get(ctx context.Context, imageID uuid.UUID) (model.Image, bool, error) {
|
||||||
|
@ -22,7 +22,7 @@ type ImageProcessor struct {
|
|||||||
|
|
||||||
// TODO: add the notifier here
|
// TODO: add the notifier here
|
||||||
|
|
||||||
processor *Processor[model.Image]
|
Processor *Processor[model.Image]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ImageProcessor) setImageToProcess(ctx context.Context, image model.Image) {
|
func (p *ImageProcessor) setImageToProcess(ctx context.Context, image model.Image) {
|
||||||
@ -66,6 +66,8 @@ func (p *ImageProcessor) extractInfo(ctx context.Context, image model.Image) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *ImageProcessor) processImage(image model.Image) {
|
func (p *ImageProcessor) processImage(image model.Image) {
|
||||||
|
p.logger.Info("Processing image", "ID", image.ID)
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
p.setImageToProcess(ctx, image)
|
p.setImageToProcess(ctx, image)
|
||||||
@ -86,10 +88,10 @@ func (p *ImageProcessor) processImage(image model.Image) {
|
|||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreateImageProcessor(logger *log.Logger, imageModel models.ImageModel) ImageProcessor {
|
func NewImageProcessor(logger *log.Logger, imageModel models.ImageModel) ImageProcessor {
|
||||||
imageProcessor := ImageProcessor{imageModel: imageModel, logger: logger}
|
imageProcessor := ImageProcessor{imageModel: imageModel, logger: logger}
|
||||||
|
|
||||||
imageProcessor.processor = NewProcessor(int(IMAGE_PROCESS_AT_A_TIME), imageProcessor.processImage)
|
imageProcessor.Processor = NewProcessor(int(IMAGE_PROCESS_AT_A_TIME), imageProcessor.processImage)
|
||||||
|
|
||||||
return imageProcessor
|
return imageProcessor
|
||||||
}
|
}
|
||||||
|
@ -11,6 +11,10 @@ func (p *Processor[TMessage]) Work() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Processor[TMessage]) Add(msg TMessage) {
|
||||||
|
p.queue <- msg
|
||||||
|
}
|
||||||
|
|
||||||
func NewProcessor[TMessage any](bufferSize int, process func(message TMessage)) *Processor[TMessage] {
|
func NewProcessor[TMessage any](bufferSize int, process func(message TMessage)) *Processor[TMessage] {
|
||||||
return &Processor[TMessage]{
|
return &Processor[TMessage]{
|
||||||
queue: make(chan TMessage, bufferSize),
|
queue: make(chan TMessage, bufferSize),
|
||||||
|
@ -2,10 +2,13 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"os"
|
||||||
"screenmark/screenmark/agents/client"
|
"screenmark/screenmark/agents/client"
|
||||||
"screenmark/screenmark/auth"
|
"screenmark/screenmark/auth"
|
||||||
"screenmark/screenmark/images"
|
"screenmark/screenmark/images"
|
||||||
"screenmark/screenmark/limits"
|
"screenmark/screenmark/limits"
|
||||||
|
"screenmark/screenmark/models"
|
||||||
|
"screenmark/screenmark/processor"
|
||||||
"screenmark/screenmark/stacks"
|
"screenmark/screenmark/stacks"
|
||||||
|
|
||||||
ourmiddleware "screenmark/screenmark/middleware"
|
ourmiddleware "screenmark/screenmark/middleware"
|
||||||
@ -25,9 +28,14 @@ func (client TestAiClient) GetImageInfo(imageName string, imageData []byte) (cli
|
|||||||
func setupRouter(db *sql.DB, jwtManager *ourmiddleware.JwtManager) chi.Router {
|
func setupRouter(db *sql.DB, jwtManager *ourmiddleware.JwtManager) chi.Router {
|
||||||
limitsManager := limits.CreateLimitsManager(db)
|
limitsManager := limits.CreateLimitsManager(db)
|
||||||
|
|
||||||
|
imageModel := models.NewImageModel(db)
|
||||||
|
imageProcessorLogger := createLogger("Image Processor", os.Stdout)
|
||||||
|
imageProcessor := processor.NewImageProcessor(imageProcessorLogger, imageModel)
|
||||||
|
go imageProcessor.Processor.Work()
|
||||||
|
|
||||||
stackHandler := stacks.CreateStackHandler(db, limitsManager, jwtManager)
|
stackHandler := stacks.CreateStackHandler(db, limitsManager, jwtManager)
|
||||||
authHandler := auth.CreateAuthHandler(db, jwtManager)
|
authHandler := auth.CreateAuthHandler(db, jwtManager)
|
||||||
imageHandler := images.CreateImageHandler(db, limitsManager, jwtManager)
|
imageHandler := images.CreateImageHandler(db, limitsManager, jwtManager, imageProcessor.Processor)
|
||||||
|
|
||||||
notifier := NewNotifier[Notification](10)
|
notifier := NewNotifier[Notification](10)
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user