refactor: tables for image and processing_image

This allows a single table to be used to process images, meaning if
anything happens to the system we can always return to polling the
database and process these images individually.

Because of this we also want an `image` table to contain the actual
binary data for the image, so we aren't selecting and writing it each
time, as it is potentially a bottleneck.
This commit is contained in:
2025-02-26 20:01:56 +00:00
parent 410270e217
commit d8095b0c67
10 changed files with 362 additions and 41 deletions

View File

@ -0,0 +1,18 @@
//
// Code generated by go-jet DO NOT EDIT.
//
// WARNING: Changes to this file may cause incorrect behavior
// and will be lost if the code is regenerated
//
package model
import (
"github.com/google/uuid"
)
type Image struct {
ID uuid.UUID `sql:"primary_key"`
ImageName string
Image []byte
}

View File

@ -13,7 +13,6 @@ import (
type UserImages struct {
ID uuid.UUID `sql:"primary_key"`
ImageName string
Image []byte
ImageID uuid.UUID
UserID uuid.UUID
}

View File

@ -0,0 +1,18 @@
//
// Code generated by go-jet DO NOT EDIT.
//
// WARNING: Changes to this file may cause incorrect behavior
// and will be lost if the code is regenerated
//
package model
import (
"github.com/google/uuid"
)
type UserImagesToProcess struct {
ID uuid.UUID `sql:"primary_key"`
ImageID uuid.UUID
UserID uuid.UUID
}

View File

@ -0,0 +1,81 @@
//
// Code generated by go-jet DO NOT EDIT.
//
// WARNING: Changes to this file may cause incorrect behavior
// and will be lost if the code is regenerated
//
package table
import (
"github.com/go-jet/jet/v2/postgres"
)
var Image = newImageTable("haystack", "image", "")
type imageTable struct {
postgres.Table
// Columns
ID postgres.ColumnString
ImageName postgres.ColumnString
Image postgres.ColumnString
AllColumns postgres.ColumnList
MutableColumns postgres.ColumnList
}
type ImageTable struct {
imageTable
EXCLUDED imageTable
}
// AS creates new ImageTable with assigned alias
func (a ImageTable) AS(alias string) *ImageTable {
return newImageTable(a.SchemaName(), a.TableName(), alias)
}
// Schema creates new ImageTable with assigned schema name
func (a ImageTable) FromSchema(schemaName string) *ImageTable {
return newImageTable(schemaName, a.TableName(), a.Alias())
}
// WithPrefix creates new ImageTable with assigned table prefix
func (a ImageTable) WithPrefix(prefix string) *ImageTable {
return newImageTable(a.SchemaName(), prefix+a.TableName(), a.TableName())
}
// WithSuffix creates new ImageTable with assigned table suffix
func (a ImageTable) WithSuffix(suffix string) *ImageTable {
return newImageTable(a.SchemaName(), a.TableName()+suffix, a.TableName())
}
func newImageTable(schemaName, tableName, alias string) *ImageTable {
return &ImageTable{
imageTable: newImageTableImpl(schemaName, tableName, alias),
EXCLUDED: newImageTableImpl("", "excluded", ""),
}
}
func newImageTableImpl(schemaName, tableName, alias string) imageTable {
var (
IDColumn = postgres.StringColumn("id")
ImageNameColumn = postgres.StringColumn("image_name")
ImageColumn = postgres.StringColumn("image")
allColumns = postgres.ColumnList{IDColumn, ImageNameColumn, ImageColumn}
mutableColumns = postgres.ColumnList{ImageNameColumn, ImageColumn}
)
return imageTable{
Table: postgres.NewTable(schemaName, tableName, alias, allColumns...),
//Columns
ID: IDColumn,
ImageName: ImageNameColumn,
Image: ImageColumn,
AllColumns: allColumns,
MutableColumns: mutableColumns,
}
}

View File

@ -10,9 +10,11 @@ package table
// UseSchema sets a new schema name for all generated table SQL builder types. It is recommended to invoke
// this method only once at the beginning of the program.
func UseSchema(schema string) {
Image = Image.FromSchema(schema)
ImageLinks = ImageLinks.FromSchema(schema)
ImageTags = ImageTags.FromSchema(schema)
ImageText = ImageText.FromSchema(schema)
UserImages = UserImages.FromSchema(schema)
UserImagesToProcess = UserImagesToProcess.FromSchema(schema)
Users = Users.FromSchema(schema)
}

View File

@ -18,8 +18,7 @@ type userImagesTable struct {
// Columns
ID postgres.ColumnString
ImageName postgres.ColumnString
Image postgres.ColumnString
ImageID postgres.ColumnString
UserID postgres.ColumnString
AllColumns postgres.ColumnList
@ -62,11 +61,10 @@ func newUserImagesTable(schemaName, tableName, alias string) *UserImagesTable {
func newUserImagesTableImpl(schemaName, tableName, alias string) userImagesTable {
var (
IDColumn = postgres.StringColumn("id")
ImageNameColumn = postgres.StringColumn("image_name")
ImageColumn = postgres.StringColumn("image")
ImageIDColumn = postgres.StringColumn("image_id")
UserIDColumn = postgres.StringColumn("user_id")
allColumns = postgres.ColumnList{IDColumn, ImageNameColumn, ImageColumn, UserIDColumn}
mutableColumns = postgres.ColumnList{ImageNameColumn, ImageColumn, UserIDColumn}
allColumns = postgres.ColumnList{IDColumn, ImageIDColumn, UserIDColumn}
mutableColumns = postgres.ColumnList{ImageIDColumn, UserIDColumn}
)
return userImagesTable{
@ -74,8 +72,7 @@ func newUserImagesTableImpl(schemaName, tableName, alias string) userImagesTable
//Columns
ID: IDColumn,
ImageName: ImageNameColumn,
Image: ImageColumn,
ImageID: ImageIDColumn,
UserID: UserIDColumn,
AllColumns: allColumns,

View File

@ -0,0 +1,81 @@
//
// Code generated by go-jet DO NOT EDIT.
//
// WARNING: Changes to this file may cause incorrect behavior
// and will be lost if the code is regenerated
//
package table
import (
"github.com/go-jet/jet/v2/postgres"
)
var UserImagesToProcess = newUserImagesToProcessTable("haystack", "user_images_to_process", "")
type userImagesToProcessTable struct {
postgres.Table
// Columns
ID postgres.ColumnString
ImageID postgres.ColumnString
UserID postgres.ColumnString
AllColumns postgres.ColumnList
MutableColumns postgres.ColumnList
}
type UserImagesToProcessTable struct {
userImagesToProcessTable
EXCLUDED userImagesToProcessTable
}
// AS creates new UserImagesToProcessTable with assigned alias
func (a UserImagesToProcessTable) AS(alias string) *UserImagesToProcessTable {
return newUserImagesToProcessTable(a.SchemaName(), a.TableName(), alias)
}
// Schema creates new UserImagesToProcessTable with assigned schema name
func (a UserImagesToProcessTable) FromSchema(schemaName string) *UserImagesToProcessTable {
return newUserImagesToProcessTable(schemaName, a.TableName(), a.Alias())
}
// WithPrefix creates new UserImagesToProcessTable with assigned table prefix
func (a UserImagesToProcessTable) WithPrefix(prefix string) *UserImagesToProcessTable {
return newUserImagesToProcessTable(a.SchemaName(), prefix+a.TableName(), a.TableName())
}
// WithSuffix creates new UserImagesToProcessTable with assigned table suffix
func (a UserImagesToProcessTable) WithSuffix(suffix string) *UserImagesToProcessTable {
return newUserImagesToProcessTable(a.SchemaName(), a.TableName()+suffix, a.TableName())
}
func newUserImagesToProcessTable(schemaName, tableName, alias string) *UserImagesToProcessTable {
return &UserImagesToProcessTable{
userImagesToProcessTable: newUserImagesToProcessTableImpl(schemaName, tableName, alias),
EXCLUDED: newUserImagesToProcessTableImpl("", "excluded", ""),
}
}
func newUserImagesToProcessTableImpl(schemaName, tableName, alias string) userImagesToProcessTable {
var (
IDColumn = postgres.StringColumn("id")
ImageIDColumn = postgres.StringColumn("image_id")
UserIDColumn = postgres.StringColumn("user_id")
allColumns = postgres.ColumnList{IDColumn, ImageIDColumn, UserIDColumn}
mutableColumns = postgres.ColumnList{ImageIDColumn, UserIDColumn}
)
return userImagesToProcessTable{
Table: postgres.NewTable(schemaName, tableName, alias, allColumns...),
//Columns
ID: IDColumn,
ImageID: ImageIDColumn,
UserID: UserIDColumn,
AllColumns: allColumns,
MutableColumns: mutableColumns,
}
}

View File

@ -28,7 +28,13 @@ func (client TestAiClient) GetImageInfo(imageName string, imageData []byte) (Ima
func GetAiClient() (AiClient, error) {
mode := os.Getenv("MODE")
if mode == "TESTING" {
return TestAiClient{}, nil
return TestAiClient{
ImageInfo: ImageInfo{
Tags: []string{"tag"},
Links: []string{"links"},
Text: []string{"text"},
},
}, nil
}
return CreateOpenAiClient()
@ -65,7 +71,9 @@ func main() {
select {
case parameters := <-listener.Notify:
log.Println("received notification, new image available: " + parameters.Extra)
imageId := parameters.Extra
log.Println("received notification, new image available: " + imageId)
go func() {
openAiClient, err := GetAiClient()
@ -73,24 +81,33 @@ func main() {
panic(err)
}
image, err := models.GetImage(parameters.Extra)
image, err := models.GetImageToProcessWithData(imageId)
if err != nil {
log.Println("1")
log.Println(err)
return
}
imageInfo, err := openAiClient.GetImageInfo(image.ImageName, image.Image)
imageInfo, err := openAiClient.GetImageInfo(image.Image.ImageName, image.Image.Image)
if err != nil {
log.Println("2")
log.Println(err)
return
}
log.Println("Finished processing image " + parameters.Extra)
savedImage, err := models.SaveImage(image.ID)
if err != nil {
log.Println("3")
log.Println(err)
return
}
log.Println("Finished processing image " + imageId)
log.Printf("Image attributes: %+v\n", imageInfo)
models.SaveImageTags(parameters.Extra, imageInfo.Tags)
models.SaveImageLinks(parameters.Extra, imageInfo.Links)
models.SaveImageTexts(parameters.Extra, imageInfo.Text)
models.SaveImageTags(savedImage.ID.String(), imageInfo.Tags)
models.SaveImageLinks(savedImage.ID.String(), imageInfo.Links)
models.SaveImageTexts(savedImage.ID.String(), imageInfo.Text)
}()
}
}
@ -140,11 +157,11 @@ func main() {
}
// TODO: this could be part of the db table
extension := filepath.Ext(image.ImageName)
extension := filepath.Ext(image.Image.ImageName)
extension = extension[1:]
w.Header().Add("Content-Type", "image/"+extension)
w.Write(image.Image)
w.Write(image.Image.Image)
})
r.Post("/image/{name}", func(w http.ResponseWriter, r *http.Request) {
@ -165,13 +182,15 @@ func main() {
image, err := io.ReadAll(r.Body)
if err != nil {
log.Println("First case")
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "Couldnt read the image from the request body")
return
}
userImage, err := models.SaveImage(userId, imageName, image)
userImage, err := models.SaveImageToProcess(userId, imageName, image)
if err != nil {
log.Println("Second case")
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "Could not save image to DB")
@ -180,6 +199,7 @@ func main() {
jsonUserImage, err := json.Marshal(userImage)
if err != nil {
log.Println("Third case")
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "Could not create JSON response for this image")

View File

@ -11,24 +11,113 @@ import (
"github.com/google/uuid"
)
func SaveImage(userId string, imageName string, imageData []byte) (model.UserImages, error) {
stmt := UserImages.INSERT(UserImages.UserID, UserImages.ImageName, UserImages.Image).VALUES(userId, imageName, imageData).RETURNING(UserImages.ID, UserImages.UserID, UserImages.ImageName)
func SaveImageToProcess(userId string, imageName string, imageData []byte) (model.UserImagesToProcess, error) {
insertImageStmt := Image.INSERT(Image.ImageName, Image.Image).VALUES(imageName, imageData).RETURNING(Image.ID)
userImage := model.UserImages{}
err := stmt.Query(db, &userImage)
// TODO: should be a transaction
image := model.Image{}
err := insertImageStmt.Query(db, &image)
if err != nil {
return model.UserImagesToProcess{}, err
}
stmt := UserImagesToProcess.INSERT(UserImagesToProcess.UserID, UserImagesToProcess.ImageID).VALUES(userId, image.ID).RETURNING(UserImagesToProcess.AllColumns)
userImage := model.UserImagesToProcess{}
err = stmt.Query(db, &userImage)
return userImage, err
}
func GetImage(imageId string) (model.UserImages, error) {
func removeImageToProcess(imageId string) error {
id := uuid.MustParse(imageId)
stmt := UserImages.SELECT(UserImages.AllColumns).WHERE(UserImages.ID.EQ(UUID(id)))
images := []model.UserImages{}
stmt := UserImagesToProcess.DELETE().WHERE(UserImagesToProcess.ID.EQ(UUID(id)))
fmt.Println(stmt.DebugSql())
_, err := stmt.Exec(db)
return err
}
func SaveImage(imageId uuid.UUID) (model.UserImages, error) {
imageToProcess, err := GetImageToProcess(imageId.String())
if err != nil {
return model.UserImages{}, err
}
stmt := UserImages.INSERT(UserImages.UserID, UserImages.ImageID).VALUES(imageToProcess.UserID, imageToProcess.ImageID).RETURNING(UserImages.ID, UserImages.UserID, UserImages.ImageID)
userImage := model.UserImages{}
err = stmt.Query(db, &userImage)
if err != nil {
return model.UserImages{}, err
}
err = removeImageToProcess(imageId.String())
if err != nil {
return model.UserImages{}, err
}
return userImage, err
}
type ImageData struct {
model.UserImages
Image model.Image
}
func GetImage(imageId string) (ImageData, error) {
id := uuid.MustParse(imageId)
stmt := SELECT(UserImages.AllColumns, Image.AllColumns).FROM(UserImages.INNER_JOIN(Image, Image.ID.EQ(UserImages.ImageID))).WHERE(UserImages.ID.EQ(UUID(id)))
images := []ImageData{}
err := stmt.Query(db, &images)
if len(images) != 1 {
return model.UserImages{}, errors.New(fmt.Sprintf("Expected 1, got %d\n", len(images)))
return ImageData{}, errors.New(fmt.Sprintf("Expected 1, got %d\n", len(images)))
}
return images[0], err
}
type ImageToProcessData struct {
model.UserImagesToProcess
Image model.Image
}
func GetImageToProcessWithData(imageId string) (ImageToProcessData, error) {
id := uuid.MustParse(imageId)
// stmt := UserImagesToProcess.SELECT(UserImages.AllColumns).WHERE(UserImages.ID.EQ(UUID(id)))
// TODO: Image should be `Images`
stmt := SELECT(UserImagesToProcess.AllColumns, Image.AllColumns).FROM(UserImagesToProcess.INNER_JOIN(Image, Image.ID.EQ(UserImagesToProcess.ImageID))).WHERE(UserImagesToProcess.ID.EQ(UUID(id)))
images := []ImageToProcessData{}
err := stmt.Query(db, &images)
if len(images) != 1 {
return ImageToProcessData{}, errors.New(fmt.Sprintf("Expected 1, got %d\n", len(images)))
}
return images[0], err
}
func GetImageToProcess(imageId string) (model.UserImagesToProcess, error) {
id := uuid.MustParse(imageId)
stmt := UserImagesToProcess.SELECT(UserImagesToProcess.AllColumns).WHERE(UserImagesToProcess.ID.EQ(UUID(id)))
fmt.Println(stmt.DebugSql())
images := []model.UserImagesToProcess{}
err := stmt.Query(db, &images)
if len(images) != 1 {
return model.UserImagesToProcess{}, errors.New(fmt.Sprintf("Expected 1, got %d\n", len(images)))
}
return images[0], err
@ -37,6 +126,9 @@ func GetImage(imageId string) (model.UserImages, error) {
type UserImagesWithInfo struct {
model.UserImages
// TODO: this shit
Image model.Image
Tags []model.ImageTags
Links []model.ImageLinks
Text []model.ImageText
@ -44,7 +136,7 @@ type UserImagesWithInfo struct {
func GetUserImages(userId string) ([]UserImagesWithInfo, error) {
id := uuid.MustParse(userId)
stmt := SELECT(UserImages.ID, UserImages.ImageName, ImageTags.AllColumns, ImageText.AllColumns, ImageLinks.AllColumns).FROM(UserImages.LEFT_JOIN(ImageTags, ImageTags.ImageID.EQ(UserImages.ID)).LEFT_JOIN(ImageText, ImageText.ImageID.EQ(UserImages.ID)).LEFT_JOIN(ImageLinks, ImageLinks.ImageID.EQ(UserImages.ID))).WHERE(UserImages.UserID.EQ(UUID(id)))
stmt := SELECT(UserImages.AllColumns, ImageTags.AllColumns, ImageText.AllColumns, ImageLinks.AllColumns).FROM(UserImages.LEFT_JOIN(ImageTags, ImageTags.ImageID.EQ(UserImages.ID)).LEFT_JOIN(ImageText, ImageText.ImageID.EQ(UserImages.ID)).LEFT_JOIN(ImageLinks, ImageLinks.ImageID.EQ(UserImages.ID))).WHERE(UserImages.UserID.EQ(UUID(id)))
images := []UserImagesWithInfo{}
err := stmt.Query(db, &images)
@ -63,6 +155,8 @@ func SaveImageTags(imageId string, tags []string) ([]model.ImageTags, error) {
stmt.RETURNING(ImageTags.AllColumns)
fmt.Println(stmt.DebugSql())
imageTags := []model.ImageTags{}
err := stmt.Query(db, &imageTags)

View File

@ -11,10 +11,21 @@ CREATE TABLE haystack.users (
id uuid PRIMARY KEY DEFAULT gen_random_uuid()
);
CREATE TABLE haystack.user_images (
CREATE TABLE haystack.image (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
image_name TEXT NOT NULL,
image BYTEA NOT NULL,
image BYTEA NOT NULL
);
CREATE TABLE haystack.user_images_to_process (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
image_id uuid NOT NULL UNIQUE REFERENCES haystack.image (id),
user_id uuid NOT NULL REFERENCES haystack.users (id)
);
CREATE TABLE haystack.user_images (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
image_id uuid NOT NULL UNIQUE REFERENCES haystack.image (id),
user_id uuid NOT NULL REFERENCES haystack.users (id)
);
@ -49,7 +60,7 @@ $$ LANGUAGE plpgsql;
/* -----| Triggers |----- */
CREATE OR REPLACE TRIGGER on_new_image AFTER INSERT
ON haystack.user_images
ON haystack.user_images_to_process
FOR EACH ROW
EXECUTE PROCEDURE notify_new_image();