diff --git a/WORKSPACE b/WORKSPACE index 01b23ed..76c17f3 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -104,8 +104,8 @@ oci_pull( oci_pull( name = "runner_base", - # This digest is of the nightly/main tag as of 2024-12-06 - digest = "sha256:35c8eaac721350ca6ef3bfb3e6080c5412ddb9061299c62bb4fd2fc6df8d0227", + # This digest is of the nightly/main tag as of 2024-12-28 + digest = "sha256:b1bcfaf52c418faabcd6b6fcbbec0f27f543a10b4a9384d75811d03dbba0d4d7", image = "ghcr.io/rmi-pacta/workflow.pacta.webapp", platforms = ["linux/amd64"], ) diff --git a/async/async.go b/async/async.go index 42a1985..caf09ad 100644 --- a/async/async.go +++ b/async/async.go @@ -86,7 +86,6 @@ func New(cfg *Config) (*Handler, error) { // TODO: Send a notification when parsing fails. func (h *Handler) ParsePortfolio(ctx context.Context, taskID task.ID, req *task.ParsePortfolioRequest, destPortfolioContainer string) error { - // Make the directories we require first. We use these instead of // /mnt/{input,output} because the base image (quite reasonably) uses a non-root // user, so we can't be creating directories in the root filesystem all willy @@ -231,8 +230,99 @@ func (h *Handler) ParsePortfolio(ctx context.Context, taskID task.ID, req *task. return nil } -func (h *Handler) CreateAudit(ctx context.Context, taskID task.ID, req *task.CreateAuditRequest) error { - return errors.New("not implemented") +func (h *Handler) CreateAudit(ctx context.Context, taskID task.ID, req *task.CreateAuditRequest, auditContainer string) error { + if n := len(req.BlobURIs); n != 1 { + return fmt.Errorf("expected exactly one blob URI as input, got %d", n) + } + blobURI := req.BlobURIs[0] + + // We use this instead of /mnt/... because the base image (quite + // reasonably) uses a non-root user, so we can't be creating directories in the + // root filesystem all willy nilly. + baseDir := filepath.Join("/", "home", "workflow-pacta-webapp") + + // We don't use the benchmark or PACTA data here, it's just convenient to use the same directory-creating harness for everything. + auditEnv, err := initEnv(h.benchmarkDir, h.pactaDataDir, baseDir, "create-audit") + if err != nil { + return fmt.Errorf("failed to init report env: %w", err) + } + + // Load the parsed portfolio from blob storage, place it in our PORFOLIO_DIR, + // where the `run_pacta.R` script expects it to be. + fileNameWithExt := filepath.Base(string(blobURI)) + if !strings.HasSuffix(fileNameWithExt, ".csv") { + return fmt.Errorf("given blob wasn't a CSV-formatted portfolio, %q", fileNameWithExt) + } + destPath := filepath.Join(auditEnv.pathForDir(PortfoliosDir), fileNameWithExt) + if err := h.downloadBlob(ctx, string(blobURI), destPath); err != nil { + return fmt.Errorf("failed to download processed portfolio blob: %w", err) + } + + inp := AuditInput{ + Portfolio: AuditInputPortfolio{ + Files: []string{fileNameWithExt}, + HoldingsDate: "2023-12-31", // TODO(#206) + Name: "FooPortfolio", // TODO(#206) + }, + Inherit: "GENERAL_2023Q4", // TODO(#206): Should this be configurable + } + + var inpJSON bytes.Buffer + if err := json.NewEncoder(&inpJSON).Encode(inp); err != nil { + return fmt.Errorf("failed to encode audit input as JSON: %w", err) + } + + cmd := exec.CommandContext(ctx, "/workflow.pacta.webapp/inst/extdata/scripts/run_audit.sh", inpJSON.String()) + cmd.Env = append(cmd.Env, auditEnv.asEnvVars()...) + cmd.Env = append(cmd.Env, + "LOG_LEVEL=DEBUG", + "HOME=/root", /* Required by pandoc */ + ) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + if err := cmd.Run(); err != nil { + return fmt.Errorf("failed to run pacta test CLI: %w", err) + } + + var artifacts []*task.AnalysisArtifact + uploadDir := func(dir string) error { + aas, err := h.uploadDirectory(ctx, dir, auditContainer, req.AnalysisID) + if err != nil { + return fmt.Errorf("failed to upload report directory: %w", err) + } + artifacts = append(artifacts, aas...) + return nil + } + + for _, outDir := range auditEnv.outputDirs() { + if err := uploadDir(outDir); err != nil { + return fmt.Errorf("failed to upload artifacts %q: %w", outDir, err) + } + } + + events := []publisher.Event{ + { + Data: task.CreateAuditResponse{ + TaskID: taskID, + Request: req, + Artifacts: artifacts, + }, + DataVersion: to.Ptr("1.0"), + EventType: to.Ptr("created-audit"), + EventTime: to.Ptr(time.Now()), + ID: to.Ptr(string(taskID)), + Subject: to.Ptr(string(taskID)), + }, + } + + if _, err := h.pubsub.PublishEvents(ctx, events, nil); err != nil { + return fmt.Errorf("failed to publish event: %w", err) + } + + h.logger.Info("created report", zap.String("task_id", string(taskID))) + + return nil } type ReportInput struct { @@ -246,6 +336,17 @@ type ReportInputPortfolio struct { Name string `json:"name"` } +type AuditInput struct { + Portfolio AuditInputPortfolio `json:"portfolio"` + Inherit string `json:"inherit"` +} + +type AuditInputPortfolio struct { + Files []string `json:"files"` + HoldingsDate string `json:"holdingsDate"` + Name string `json:"name"` +} + type DashboardInput struct { Portfolio DashboardInputPortfolio `json:"portfolio"` Inherit string `json:"inherit"` diff --git a/cmd/runner/configs/dev.conf b/cmd/runner/configs/dev.conf index 7e3d41b..01fa32d 100644 --- a/cmd/runner/configs/dev.conf +++ b/cmd/runner/configs/dev.conf @@ -6,3 +6,4 @@ azure_topic_location centralus-1 azure_storage_account rmipactadev azure_report_container reports +azure_audit_container audits diff --git a/cmd/runner/configs/local.conf b/cmd/runner/configs/local.conf index 27cf204..fce1a9a 100644 --- a/cmd/runner/configs/local.conf +++ b/cmd/runner/configs/local.conf @@ -6,6 +6,7 @@ azure_topic_location centralus-1 azure_storage_account rmipactalocal azure_report_container reports +azure_audit_container audits benchmark_dir /mnt/workflow-data/benchmarks/2023Q4_20240529T002355Z pacta_data_dir /mnt/workflow-data/pacta-data/2023Q4_20240424T120055Z diff --git a/cmd/runner/configs/test.conf b/cmd/runner/configs/test.conf index 7907224..fe9854d 100644 --- a/cmd/runner/configs/test.conf +++ b/cmd/runner/configs/test.conf @@ -6,3 +6,4 @@ azure_topic_location westeurope-1 azure_storage_account rmipactatest azure_report_container reports +azure_audit_container audits diff --git a/cmd/runner/main.go b/cmd/runner/main.go index 2a3e45d..0b9724d 100644 --- a/cmd/runner/main.go +++ b/cmd/runner/main.go @@ -45,6 +45,7 @@ func run(args []string) error { azStorageAccount = fs.String("azure_storage_account", "", "The storage account to authenticate against for blob operations") azReportContainer = fs.String("azure_report_container", "", "The container in the storage account where we write generated portfolio reports to") + azAuditContainer = fs.String("azure_audit_container", "", "The container in the storage account where we write generated portfolio audits to") minLogLevel zapcore.Level = zapcore.DebugLevel ) @@ -97,7 +98,9 @@ func run(args []string) error { task.CreateReport: toRunFn(async.LoadCreateReportRequestFromEnv, func(ctx context.Context, id task.ID, req *task.CreateReportRequest) error { return h.CreateReport(ctx, id, req, *azReportContainer) }), - task.CreateAudit: toRunFn(async.LoadCreateAuditRequestFromEnv, h.CreateAudit), + task.CreateAudit: toRunFn(async.LoadCreateAuditRequestFromEnv, func(ctx context.Context, id task.ID, req *task.CreateAuditRequest) error { + return h.CreateAudit(ctx, id, req, *azAuditContainer) + }), } taskID := task.ID(os.Getenv("TASK_ID")) diff --git a/frontend/components/analysis/AccessButtons.vue b/frontend/components/analysis/AccessButtons.vue index c1845bc..5edbd4c 100644 --- a/frontend/components/analysis/AccessButtons.vue +++ b/frontend/components/analysis/AccessButtons.vue @@ -1,5 +1,5 @@