// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package archiver

import (
	"context"
	"sync"
	"time"

	"github.com/ClusterCockpit/cc-backend/internal/repository"
	"github.com/ClusterCockpit/cc-backend/pkg/log"
	"github.com/ClusterCockpit/cc-backend/pkg/schema"
	sq "github.com/Masterminds/squirrel"
)

var (
	archivePending sync.WaitGroup
	archiveChannel chan *schema.Job
	jobRepo        *repository.JobRepository
)

func Start(r *repository.JobRepository) {
	archiveChannel = make(chan *schema.Job, 128)
	jobRepo = r

	go archivingWorker()
}

// Archiving worker thread
func archivingWorker() {
	for {
		select {
		case job, ok := <-archiveChannel:
			if !ok {
				break
			}
			start := time.Now()
			// not using meta data, called to load JobMeta into Cache?
			// will fail if job meta not in repository
			if _, err := jobRepo.FetchMetadata(job); err != nil {
				log.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error())
				jobRepo.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
				continue
			}

			// ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend
			// TODO: Maybe use context with cancel/timeout here
			jobMeta, err := ArchiveJob(job, context.Background())
			if err != nil {
				log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error())
				jobRepo.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
				continue
			}

			stmt := sq.Update("job").Where("job.id = ?", job.ID)

			if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil {
				log.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error())
				continue
			}
			if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil {
				log.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error())
				continue
			}
			// Update the jobs database entry one last time:
			stmt = jobRepo.MarkArchived(stmt, schema.MonitoringStatusArchivingSuccessful)
			if err := jobRepo.Execute(stmt); err != nil {
				log.Errorf("archiving job (dbid: %d) failed at db execute: %s", job.ID, err.Error())
				continue
			}
			log.Debugf("archiving job %d took %s", job.JobID, time.Since(start))
			log.Printf("archiving job (dbid: %d) successful", job.ID)
			archivePending.Done()
		}
	}
}

// Trigger async archiving
func TriggerArchiving(job *schema.Job) {
	if archiveChannel == nil {
		log.Fatal("Cannot archive without archiving channel. Did you Start the archiver?")
	}

	archivePending.Add(1)
	archiveChannel <- job
}

// Wait for background thread to finish pending archiving operations
func WaitForArchiving() {
	// close channel and wait for worker to process remaining jobs
	archivePending.Wait()
}