sending notifications about new stacks
This commit is contained in:
@ -19,13 +19,63 @@ import (
|
||||
"github.com/lib/pq"
|
||||
)
|
||||
|
||||
type Notification struct {
|
||||
const (
|
||||
IMAGE_TYPE = "image"
|
||||
LIST_TYPE = "list"
|
||||
)
|
||||
|
||||
type imageNotification struct {
|
||||
Type string
|
||||
|
||||
ImageID uuid.UUID
|
||||
ImageName string
|
||||
Status string
|
||||
|
||||
Status string
|
||||
}
|
||||
|
||||
func ListenNewImageEvents(db *sql.DB, notifier *Notifier[Notification]) {
|
||||
type listNotification struct {
|
||||
Type string
|
||||
|
||||
ListID uuid.UUID
|
||||
Name string
|
||||
|
||||
Status string
|
||||
}
|
||||
|
||||
type Notification struct {
|
||||
image *imageNotification
|
||||
list *listNotification
|
||||
}
|
||||
|
||||
func getImageNotification(image imageNotification) Notification {
|
||||
return Notification{
|
||||
image: &image,
|
||||
}
|
||||
}
|
||||
|
||||
func getListNotification(list listNotification) Notification {
|
||||
return Notification{
|
||||
list: &list,
|
||||
}
|
||||
}
|
||||
|
||||
func (n Notification) MarshalJSON() ([]byte, error) {
|
||||
if n.image != nil {
|
||||
return json.Marshal(n.image)
|
||||
}
|
||||
|
||||
if n.list != nil {
|
||||
return json.Marshal(n.list)
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("no image or list present")
|
||||
}
|
||||
|
||||
func (n *Notification) UnmarshalJSON(data []byte) error {
|
||||
return fmt.Errorf("unimplemented")
|
||||
}
|
||||
|
||||
func ListenNewImageEvents(db *sql.DB) {
|
||||
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
@ -128,11 +178,12 @@ func ListenProcessingImageStatus(db *sql.DB, images models.ImageModel, notifier
|
||||
|
||||
logger.Info("Update", "id", imageStringUuid, "status", status)
|
||||
|
||||
notification := Notification{
|
||||
notification := getImageNotification(imageNotification{
|
||||
Type: IMAGE_TYPE,
|
||||
ImageID: processingImage.ImageID,
|
||||
ImageName: processingImage.Image.ImageName,
|
||||
Status: status,
|
||||
}
|
||||
})
|
||||
|
||||
if err := notifier.SendAndCreate(processingImage.UserID.String(), notification); err != nil {
|
||||
logger.Error(err)
|
||||
@ -191,6 +242,51 @@ func ListenNewStackEvents(db *sql.DB) {
|
||||
}
|
||||
}
|
||||
|
||||
func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier *Notifier[Notification]) {
|
||||
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
})
|
||||
defer listener.Close()
|
||||
|
||||
logger := createLogger("Stack Status 📊", os.Stdout)
|
||||
|
||||
if err := listener.Listen("new_processing_stack_status"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
for data := range listener.Notify {
|
||||
stackStringUUID := data.Extra[0:36]
|
||||
status := data.Extra[36:]
|
||||
|
||||
stackUUID, err := uuid.Parse(stackStringUUID)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
processingStack, err := stacks.GetToProcess(context.Background(), stackUUID)
|
||||
if err != nil {
|
||||
logger.Error("GetToProcess failed", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Info("Update", "id", stackStringUUID, "status", status)
|
||||
|
||||
notification := getListNotification(listNotification{
|
||||
Type: LIST_TYPE,
|
||||
Name: processingStack.Title,
|
||||
ListID: stackUUID,
|
||||
Status: status,
|
||||
})
|
||||
|
||||
if err := notifier.SendAndCreate(processingStack.UserID.String(), notification); err != nil {
|
||||
logger.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* TODO: We have channels open every a user sends an image.
|
||||
* We never close these channels.
|
||||
@ -251,7 +347,8 @@ func CreateEventsHandler(notifier *Notifier[Notification]) http.HandlerFunc {
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("Sending msg %s\n", msg)
|
||||
fmt.Printf("Sending msg %s\n", msgString)
|
||||
|
||||
fmt.Fprintf(w, "event: data\ndata: %s\n\n", string(msgString))
|
||||
w.(http.Flusher).Flush()
|
||||
}
|
||||
|
@ -28,6 +28,7 @@ func (client TestAiClient) GetImageInfo(imageName string, imageData []byte) (cli
|
||||
|
||||
func setupRouter(db *sql.DB) chi.Router {
|
||||
imageModel := models.NewImageModel(db)
|
||||
stackModel := models.NewListModel(db)
|
||||
|
||||
stackHandler := stacks.CreateStackHandler(db)
|
||||
authHandler := auth.CreateAuthHandler(db)
|
||||
@ -37,9 +38,15 @@ func setupRouter(db *sql.DB) chi.Router {
|
||||
|
||||
// Only start event listeners if not in test environment
|
||||
if os.Getenv("GO_TEST_ENVIRONMENT") != "true" {
|
||||
go ListenNewImageEvents(db, ¬ifier)
|
||||
|
||||
// TODO: should extract these into a notification manager
|
||||
// And actually make them the same code.
|
||||
// The events are basically the same.
|
||||
|
||||
go ListenNewImageEvents(db)
|
||||
go ListenProcessingImageStatus(db, imageModel, ¬ifier)
|
||||
go ListenNewStackEvents(db)
|
||||
go ListenProcessingStackStatus(db, stackModel, ¬ifier)
|
||||
}
|
||||
|
||||
r := chi.NewRouter()
|
||||
|
@ -92,6 +92,21 @@ func (m ListModel) GetProcessing(ctx context.Context, processingListID uuid.UUID
|
||||
return list, err
|
||||
}
|
||||
|
||||
func (m ListModel) GetToProcess(ctx context.Context, listID uuid.UUID) (model.ProcessingLists, error) {
|
||||
getToProcessStmt := ProcessingLists.
|
||||
SELECT(ProcessingLists.AllColumns).
|
||||
WHERE(ProcessingLists.ID.EQ(UUID(listID)))
|
||||
|
||||
stack := []model.ProcessingLists{}
|
||||
err := getToProcessStmt.QueryContext(ctx, m.dbPool, &stack)
|
||||
|
||||
if len(stack) != 1 {
|
||||
return model.ProcessingLists{}, fmt.Errorf("Expected 1, got %d\n", len(stack))
|
||||
}
|
||||
|
||||
return stack[0], err
|
||||
}
|
||||
|
||||
// ========================================
|
||||
// UPDATE
|
||||
// ========================================
|
||||
|
@ -124,6 +124,14 @@ PERFORM pg_notify('new_stack', NEW.id::text);
|
||||
END
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION notify_new_processing_stack_status()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
PERFORM pg_notify('new_processing_stack_status', NEW.id::text || NEW.status::text);
|
||||
RETURN NEW;
|
||||
END
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
/* -----| Triggers |----- */
|
||||
|
||||
CREATE OR REPLACE TRIGGER on_new_image AFTER INSERT
|
||||
@ -142,4 +150,10 @@ ON haystack.processing_lists
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE notify_new_stacks();
|
||||
|
||||
CREATE OR REPLACE TRIGGER on_update_stack_progress
|
||||
AFTER UPDATE OF status
|
||||
ON haystack.processing_lists
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE notify_new_processing_stack_status();
|
||||
|
||||
/* -----| Test Data |----- */
|
||||
|
Reference in New Issue
Block a user