reprocessing images now works
This commit is contained in:
@ -76,45 +76,27 @@ func (n *Notification) UnmarshalJSON(data []byte) error {
|
|||||||
return fmt.Errorf("unimplemented")
|
return fmt.Errorf("unimplemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func ListenNewImageEvents(db *sql.DB) {
|
func ProcessImage(log *log.Logger, db *sql.DB) func(imageID uuid.UUID) {
|
||||||
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
defer listener.Close()
|
|
||||||
|
|
||||||
imageModel := models.NewImageModel(db)
|
imageModel := models.NewImageModel(db)
|
||||||
listModel := models.NewListModel(db)
|
listModel := models.NewListModel(db)
|
||||||
|
|
||||||
limits := limits.CreateLimitsManager(db)
|
limits := limits.CreateLimitsManager(db)
|
||||||
|
|
||||||
databaseEventLog := createLogger("Database Events 🤖", os.Stdout)
|
ctx := context.Background()
|
||||||
databaseEventLog.SetLevel(log.DebugLevel)
|
|
||||||
|
|
||||||
err := listener.Listen("new_image")
|
return func(imageID uuid.UUID) {
|
||||||
if err != nil {
|
log.Debug("Starting processing image", "ImageID", imageID)
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for parameters := range listener.Notify {
|
|
||||||
imageId := uuid.MustParse(parameters.Extra)
|
|
||||||
|
|
||||||
databaseEventLog.Debug("Starting processing image", "ImageID", imageId)
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
image, err := imageModel.GetToProcessWithData(ctx, imageId)
|
image, err := imageModel.GetToProcessWithData(ctx, imageID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
databaseEventLog.Error("Failed to GetToProcessWithData", "error", err)
|
log.Error("Failed to GetToProcessWithData", "error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
splitWriter := createDbStdoutWriter(db, image.ImageID)
|
splitWriter := createDbStdoutWriter(db, image.ImageID)
|
||||||
|
|
||||||
if err := imageModel.StartProcessing(ctx, image.ID); err != nil {
|
if err := imageModel.StartProcessing(ctx, image.ID); err != nil {
|
||||||
databaseEventLog.Error("Failed to FinishProcessing", "error", err)
|
log.Error("Failed to FinishProcessing", "error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -140,15 +122,37 @@ func ListenNewImageEvents(db *sql.DB) {
|
|||||||
|
|
||||||
_, err = imageModel.FinishProcessing(ctx, image.ID)
|
_, err = imageModel.FinishProcessing(ctx, image.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
databaseEventLog.Error("Failed to finish processing", "ImageID", imageId, "error", err)
|
log.Error("Failed to finish processing", "ImageID", imageID, "error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
databaseEventLog.Debug("Finished processing image", "ImageID", imageId)
|
log.Debug("Finished processing image", "ImageID", imageID)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ListenNewImageEvents(db *sql.DB) {
|
||||||
|
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
defer listener.Close()
|
||||||
|
|
||||||
|
databaseEventLog := createLogger("Database Events 🤖", os.Stdout)
|
||||||
|
databaseEventLog.SetLevel(log.DebugLevel)
|
||||||
|
|
||||||
|
err := listener.Listen("new_image")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for parameters := range listener.Notify {
|
||||||
|
imageID := uuid.MustParse(parameters.Extra)
|
||||||
|
ProcessImage(databaseEventLog, db)(imageID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func ListenProcessingImageStatus(db *sql.DB, images models.ImageModel, notifier *Notifier[Notification]) {
|
func ListenProcessingImageStatus(db *sql.DB, images models.ImageModel, notifier *Notifier[Notification]) {
|
||||||
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
|
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -20,10 +20,14 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type ImageHandler struct {
|
type ImageHandler struct {
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
imageModel models.ImageModel
|
|
||||||
userModel models.UserModel
|
imageModel models.ImageModel
|
||||||
|
userModel models.UserModel
|
||||||
|
|
||||||
limitsManager limits.LimitsManagerMethods
|
limitsManager limits.LimitsManagerMethods
|
||||||
|
|
||||||
|
processImage func(imageID uuid.UUID)
|
||||||
}
|
}
|
||||||
|
|
||||||
type ImagesReturn struct {
|
type ImagesReturn struct {
|
||||||
@ -200,16 +204,30 @@ func (h *ImageHandler) reprocessImage(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
imageToProcessID, err := h.imageModel.GetImageToProcessID(ctx, imageID)
|
imageToProcessID, err := h.imageModel.GetImageToProcessID(ctx, imageID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
h.logger.Error("get image to process", "err", err)
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// The whole way in which I do this event driven stuff is stupid.
|
||||||
|
// It's so messy now
|
||||||
|
|
||||||
|
err = h.imageModel.DeleteUserImage(ctx, imageID)
|
||||||
|
if err != nil {
|
||||||
|
h.logger.Error("delete user image", "err", err)
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = h.imageModel.SetNotStarted(ctx, imageToProcessID)
|
err = h.imageModel.SetNotStarted(ctx, imageToProcessID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
h.logger.Error("set not started", "err", err)
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
h.processImage(imageToProcessID)
|
||||||
|
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -226,11 +244,12 @@ func (h *ImageHandler) CreateRoutes(r chi.Router) {
|
|||||||
|
|
||||||
r.Get("/", h.listImages)
|
r.Get("/", h.listImages)
|
||||||
r.Post("/{name}", middleware.WithLimit(h.logger, h.limitsManager.HasReachedImageLimit, h.uploadImage))
|
r.Post("/{name}", middleware.WithLimit(h.logger, h.limitsManager.HasReachedImageLimit, h.uploadImage))
|
||||||
|
r.Patch("/{image-id}", h.reprocessImage)
|
||||||
r.Delete("/{image-id}", h.deleteImage)
|
r.Delete("/{image-id}", h.deleteImage)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreateImageHandler(db *sql.DB, limitsManager limits.LimitsManagerMethods) ImageHandler {
|
func CreateImageHandler(db *sql.DB, limitsManager limits.LimitsManagerMethods, processImage func(imageID uuid.UUID)) ImageHandler {
|
||||||
imageModel := models.NewImageModel(db)
|
imageModel := models.NewImageModel(db)
|
||||||
userModel := models.NewUserModel(db)
|
userModel := models.NewUserModel(db)
|
||||||
logger := log.New(os.Stdout).WithPrefix("Images")
|
logger := log.New(os.Stdout).WithPrefix("Images")
|
||||||
@ -240,5 +259,6 @@ func CreateImageHandler(db *sql.DB, limitsManager limits.LimitsManagerMethods) I
|
|||||||
imageModel: imageModel,
|
imageModel: imageModel,
|
||||||
userModel: userModel,
|
userModel: userModel,
|
||||||
limitsManager: limitsManager,
|
limitsManager: limitsManager,
|
||||||
|
processImage: processImage,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -230,6 +230,15 @@ func (m ImageModel) AddDescription(ctx context.Context, imageId uuid.UUID, descr
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m ImageModel) DeleteUserImage(ctx context.Context, imageID uuid.UUID) error {
|
||||||
|
deleteImageStmt := UserImages.DELETE().
|
||||||
|
WHERE(UserImages.ImageID.EQ(UUID(imageID)))
|
||||||
|
|
||||||
|
_, err := deleteImageStmt.ExecContext(ctx, m.dbPool)
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (m ImageModel) Delete(ctx context.Context, imageID uuid.UUID) error {
|
func (m ImageModel) Delete(ctx context.Context, imageID uuid.UUID) error {
|
||||||
deleteImageStmt := Image.DELETE().
|
deleteImageStmt := Image.DELETE().
|
||||||
WHERE(Image.ID.EQ(UUID(imageID)))
|
WHERE(Image.ID.EQ(UUID(imageID)))
|
||||||
|
@ -30,9 +30,12 @@ func setupRouter(db *sql.DB) chi.Router {
|
|||||||
|
|
||||||
limitsManager := limits.CreateLimitsManager(db)
|
limitsManager := limits.CreateLimitsManager(db)
|
||||||
|
|
||||||
|
processImageLogger := createLogger("Process Image", os.Stdout)
|
||||||
|
processImage := ProcessImage(processImageLogger, db)
|
||||||
|
|
||||||
stackHandler := stacks.CreateStackHandler(db, limitsManager)
|
stackHandler := stacks.CreateStackHandler(db, limitsManager)
|
||||||
authHandler := auth.CreateAuthHandler(db)
|
authHandler := auth.CreateAuthHandler(db)
|
||||||
imageHandler := images.CreateImageHandler(db, limitsManager)
|
imageHandler := images.CreateImageHandler(db, limitsManager, processImage)
|
||||||
|
|
||||||
notifier := NewNotifier[Notification](10)
|
notifier := NewNotifier[Notification](10)
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user