Skip to content

Commit

Permalink
Integrate latest report generation Docker image into site (#207)
Browse files Browse the repository at this point in the history
  • Loading branch information
bcspragu authored Jul 30, 2024
1 parent cd18e5c commit fc78323
Show file tree
Hide file tree
Showing 15 changed files with 334 additions and 58 deletions.
8 changes: 4 additions & 4 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ oci_pull(

oci_pull(
name = "runner_base",
digest = "sha256:d0b2922dc48cb6acb7c767f89f0c92ccbe1a043166971bac0b585b3851a9b720",
# TODO(#44): Replace this base image with a more permanent one.
image = "docker.io/curfewreplica/pactatest",
# platforms = ["linux/amd64"],
# This digest is of the nightly/main tag as of 2024-07-22
digest = "sha256:7adec544294b5cb9e11c6bb4c43d0b2de646e5f933639f86c85f3f03c99f650e",
image = "ghcr.io/rmi-pacta/workflow.pacta.webapp",
platforms = ["linux/amd64"],
)

oci_pull(
Expand Down
248 changes: 210 additions & 38 deletions async/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type Config struct {
Blob Blob
PubSub *publisher.Client
Logger *zap.Logger

BenchmarkDir string
PACTADataDir string
}

func (c *Config) validate() error {
Expand All @@ -43,6 +46,12 @@ func (c *Config) validate() error {
return errors.New("no logger given")
}

if c.BenchmarkDir == "" {
return errors.New("no benchmark dir specified")
}
if c.PACTADataDir == "" {
return errors.New("no PACTA data dir specified")
}
return nil
}

Expand All @@ -56,6 +65,9 @@ type Handler struct {
blob Blob
pubsub *publisher.Client
logger *zap.Logger

// Mounted directories with data needed for report generation.
benchmarkDir, pactaDataDir string
}

func New(cfg *Config) (*Handler, error) {
Expand All @@ -64,9 +76,11 @@ func New(cfg *Config) (*Handler, error) {
}

return &Handler{
blob: cfg.Blob,
pubsub: cfg.PubSub,
logger: cfg.Logger,
blob: cfg.Blob,
pubsub: cfg.PubSub,
logger: cfg.Logger,
benchmarkDir: cfg.BenchmarkDir,
pactaDataDir: cfg.PACTADataDir,
}, nil
}

Expand Down Expand Up @@ -221,31 +235,169 @@ func (h *Handler) CreateAudit(ctx context.Context, taskID task.ID, req *task.Cre
return errors.New("not implemented")
}

func (h *Handler) CreateReport(ctx context.Context, taskID task.ID, req *task.CreateReportRequest, reportContainer string) error {
fileNames := []string{}
for _, blobURI := range req.BlobURIs {
// Load the parsed portfolio from blob storage, place it in /mnt/
// processed_portfolios, where the `create_report.R` script expects it
// to be.
fileNameWithExt := filepath.Base(string(blobURI))
if !strings.HasSuffix(fileNameWithExt, ".json") {
return fmt.Errorf("given blob wasn't a JSON-formatted portfolio, %q", fileNameWithExt)
type ReportInput struct {
Portfolio ReportInputPortfolio `json:"portfolio"`
Inherit string `json:"inherit"`
}

type ReportInputPortfolio struct {
Files string `json:"files"`
HoldingsDate string `json:"holdingsDate"`
Name string `json:"name"`
}

type ReportEnv struct {
rootDir string

// These are mounted in from externally.
benchmarksDir string
pactaDataDir string
}

func initReportEnv(benchmarkDir, pactaDataDir, baseDir string) (*ReportEnv, error) {
// Make sure the base directory exists first.
if err := os.MkdirAll(baseDir, 0700); err != nil {
return nil, fmt.Errorf("failed to create base input dir: %w", err)
}
// We create temp subdirectories, because while this code currently executes in
// a new container for each invocation, that might not always be the case.
rootDir, err := os.MkdirTemp(baseDir, "create-report")
if err != nil {
return nil, fmt.Errorf("failed to create temp dir for input CSVs: %w", err)
}

re := &ReportEnv{
rootDir: rootDir,
benchmarksDir: benchmarkDir,
pactaDataDir: pactaDataDir,
}

if err := re.makeDirectories(); err != nil {
return nil, fmt.Errorf("failed to create directories: %w", err)
}

return re, nil
}

type ReportDir string

const (
PortfoliosDir = ReportDir("portfolios")
RealEstateDir = ReportDir("real-estate")
ScoreCardDir = ReportDir("score-card")
SurveyDir = ReportDir("survey")

// Outputs
AnalysisOutputDir = ReportDir("analysis-output")
ReportOutputDir = ReportDir("report-output")
SummaryOutputDir = ReportDir("summary-output")
)

func (r *ReportEnv) outputDirs() []string {
return []string{
r.pathForDir(AnalysisOutputDir),
r.pathForDir(ReportOutputDir),
r.pathForDir(SummaryOutputDir),
}
}

func (r *ReportEnv) asEnvVars() []string {
return []string{
"BENCHMARKS_DIR=" + r.benchmarksDir,
"PACTA_DATA_DIR=" + r.pactaDataDir,
"PORTFOLIO_DIR=" + r.pathForDir(PortfoliosDir),
"REAL_ESTATE_DIR=" + r.pathForDir(RealEstateDir),
"SCORE_CARD_DIR=" + r.pathForDir(ScoreCardDir),
"SURVEY_DIR=" + r.pathForDir(SurveyDir),
"ANALYSIS_OUTPUT_DIR=" + r.pathForDir(AnalysisOutputDir),
"REPORT_OUTPUT_DIR=" + r.pathForDir(ReportOutputDir),
"SUMMARY_OUTPUT_DIR=" + r.pathForDir(SummaryOutputDir),
}
}

func (r *ReportEnv) pathForDir(d ReportDir) string {
return filepath.Join(r.rootDir, string(d))
}

func (r *ReportEnv) makeDirectories() error {
var rErr error
makeDir := func(reportDir ReportDir) {
if rErr != nil {
return
}
fileNames = append(fileNames, strings.TrimSuffix(fileNameWithExt, ".json"))
destPath := filepath.Join("/", "mnt", "processed_portfolios", fileNameWithExt)
if err := h.downloadBlob(ctx, string(blobURI), destPath); err != nil {
return fmt.Errorf("failed to download processed portfolio blob: %w", err)
dir := r.pathForDir(reportDir)
if err := os.Mkdir(dir, 0700); err != nil {
rErr = fmt.Errorf("failed to create dir %q: %w", dir, err)
return
}
}

reportDir := filepath.Join("/", "mnt", "reports")
if err := os.MkdirAll(reportDir, 0600); err != nil {
return fmt.Errorf("failed to create directory for reports to get copied to: %w", err)
// Inputs
makeDir(PortfoliosDir)
makeDir(RealEstateDir) // Used as part of specific projects, empty for now.
makeDir(ScoreCardDir) // Used as part of specific projects, empty for now.
makeDir(SurveyDir) // Used as part of specific projects, empty for now.

// Outputs
makeDir(AnalysisOutputDir)
makeDir(ReportOutputDir)
makeDir(SummaryOutputDir)

if rErr != nil {
return rErr
}
return nil
}

func (h *Handler) CreateReport(ctx context.Context, taskID task.ID, req *task.CreateReportRequest, reportContainer string) error {
if n := len(req.BlobURIs); n != 1 {
return fmt.Errorf("expected exactly one blob URI as input, got %d", n)
}
blobURI := req.BlobURIs[0]

// We use this instead of /mnt/... because the base image (quite
// reasonably) uses a non-root user, so we can't be creating directories in the
// root filesystem all willy nilly.
baseDir := filepath.Join("/", "home", "workflow-pacta-webapp")

reportEnv, err := initReportEnv(h.benchmarkDir, h.pactaDataDir, baseDir)
if err != nil {
return fmt.Errorf("failed to init report env: %w", err)
}

// Load the parsed portfolio from blob storage, place it in our PORFOLIO_DIR,
// where the `run_pacta.R` script expects it to be.
fileNameWithExt := filepath.Base(string(blobURI))
if !strings.HasSuffix(fileNameWithExt, ".csv") {
return fmt.Errorf("given blob wasn't a CSV-formatted portfolio, %q", fileNameWithExt)
}
destPath := filepath.Join(reportEnv.pathForDir(PortfoliosDir), fileNameWithExt)
if err := h.downloadBlob(ctx, string(blobURI), destPath); err != nil {
return fmt.Errorf("failed to download processed portfolio blob: %w", err)
}

inp := ReportInput{
Portfolio: ReportInputPortfolio{
Files: fileNameWithExt,
HoldingsDate: "2023-12-31", // TODO(#206)
Name: "FooPortfolio", // TODO(#206)
},
Inherit: "GENERAL_2023Q4", // TODO(#206): Should this be configurable
}

cmd := exec.CommandContext(ctx, "/usr/local/bin/Rscript", "/app/create_report.R")
var inpJSON bytes.Buffer
if err := json.NewEncoder(&inpJSON).Encode(inp); err != nil {
return fmt.Errorf("failed to encode report input as JSON: %w", err)
}

cmd := exec.CommandContext(ctx,
"/usr/local/bin/Rscript",
"--vanilla", "/workflow.pacta.webapp/inst/extdata/scripts/run_pacta.R",
inpJSON.String())

cmd.Env = append(cmd.Env, reportEnv.asEnvVars()...)
cmd.Env = append(cmd.Env,
"PORTFOLIO="+strings.Join(fileNames, ","),
"LOG_LEVEL=DEBUG",
"HOME=/root", /* Required by pandoc */
)
cmd.Stdout = os.Stdout
Expand All @@ -255,23 +407,20 @@ func (h *Handler) CreateReport(ctx context.Context, taskID task.ID, req *task.Cr
return fmt.Errorf("failed to run pacta test CLI: %w", err)
}

// Download outputs from from /out and upload them to Azure
dirEntries, err := os.ReadDir(reportDir)
if err != nil {
return fmt.Errorf("failed to read report directory: %w", err)
}

var artifacts []*task.AnalysisArtifact
for _, dirEntry := range dirEntries {
if !dirEntry.IsDir() {
continue
}
dirPath := filepath.Join(reportDir, dirEntry.Name())
tmp, err := h.uploadDirectory(ctx, dirPath, reportContainer)
uploadDir := func(dir string) error {
aas, err := h.uploadDirectory(ctx, dir, reportContainer, req.AnalysisID)
if err != nil {
return fmt.Errorf("failed to upload report directory: %w", err)
}
artifacts = tmp
artifacts = append(artifacts, aas...)
return nil
}

for _, outDir := range reportEnv.outputDirs() {
if err := uploadDir(outDir); err != nil {
return fmt.Errorf("failed to upload artifacts %q: %w", outDir, err)
}
}

events := []publisher.Event{
Expand Down Expand Up @@ -331,7 +480,7 @@ func (h *Handler) downloadBlob(ctx context.Context, srcURI, destPath string) err
return nil
}

func (h *Handler) uploadDirectory(ctx context.Context, dirPath, container string) ([]*task.AnalysisArtifact, error) {
func (h *Handler) uploadDirectory(ctx context.Context, dirPath, container string, analysisID pacta.AnalysisID) ([]*task.AnalysisArtifact, error) {
base := filepath.Base(dirPath)

var artifacts []*task.AnalysisArtifact
Expand All @@ -341,14 +490,14 @@ func (h *Handler) uploadDirectory(ctx context.Context, dirPath, container string
}

// This is a file, let's upload it to the container
uri := blob.Join(h.blob.Scheme(), container, base, strings.TrimPrefix(path, dirPath+"/"))
uri := blob.Join(h.blob.Scheme(), container, string(analysisID), base, strings.TrimPrefix(path, dirPath+"/"))
if err := h.uploadBlob(ctx, path, uri); err != nil {
return fmt.Errorf("failed to upload blob: %w", err)
}

fn := filepath.Base(path)
// Returns pacta.FileType_UNKNOWN for unrecognized extensions, which we'll serve as binary blobs.
ft := fileTypeFromExt(filepath.Ext(fn))
ft := fileTypeFromFilename(fn)
if ft == pacta.FileType_UNKNOWN {
h.logger.Error("unhandled file extension", zap.String("dir", dirPath), zap.String("file_ext", filepath.Ext(fn)))
}
Expand All @@ -365,7 +514,9 @@ func (h *Handler) uploadDirectory(ctx context.Context, dirPath, container string
return artifacts, nil
}

func fileTypeFromExt(ext string) pacta.FileType {
func fileTypeFromFilename(fn string) pacta.FileType {
ext := filepath.Ext(fn)

switch ext {
case ".csv":
return pacta.FileType_CSV
Expand All @@ -383,8 +534,29 @@ func fileTypeFromExt(ext string) pacta.FileType {
return pacta.FileType_CSS
case ".js":
return pacta.FileType_JS
case ".map":
switch ext2 := filepath.Ext(strings.TrimSuffix(fn, ext)); ext2 {
case ".js":
return pacta.FileType_JS_MAP
default:
return pacta.FileType_UNKNOWN
}
case ".ttf":
return pacta.FileType_TTF
case ".woff":
return pacta.FileType_WOFF
case ".woff2":
return pacta.FileType_WOFF2
case ".eot":
return pacta.FileType_EOT
case ".svg":
return pacta.FileType_SVG
case ".png":
return pacta.FileType_PNG
case ".jpg":
return pacta.FileType_JPG
case ".pdf":
return pacta.FileType_TTF
default:
return pacta.FileType_UNKNOWN
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/runner/configs/local.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ azure_topic_location centralus-1

azure_storage_account rmipactalocal
azure_report_container reports

benchmark_dir /mnt/workflow-data/benchmarks/2023Q4_20240529T002355Z
pacta_data_dir /mnt/workflow-data/pacta-data/2023Q4_20240424T120055Z
11 changes: 8 additions & 3 deletions cmd/runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ func run(args []string) error {
var (
env = fs.String("env", "", "The environment we're running in.")

benchmarkDir = fs.String("benchmark_dir", "", "The path to the benchmark data for report generation")
pactaDataDir = fs.String("pacta_data_dir", "", "The path to the PACTA data for report generation")

azEventTopic = fs.String("azure_event_topic", "", "The EventGrid topic to send notifications when tasks have finished")
azTopicLocation = fs.String("azure_topic_location", "", "The location (like 'centralus-1') where our EventGrid topics are hosted")

Expand Down Expand Up @@ -80,9 +83,11 @@ func run(args []string) error {
}

h, err := async.New(&async.Config{
Blob: blobClient,
PubSub: pubsubClient,
Logger: logger,
Blob: blobClient,
PubSub: pubsubClient,
Logger: logger,
BenchmarkDir: *benchmarkDir,
PACTADataDir: *pactaDataDir,
})
if err != nil {
return fmt.Errorf("failed to init async biz logic handler: %w", err)
Expand Down
Loading

0 comments on commit fc78323

Please sign in to comment.