Skip to content

Commit

Permalink
fix(api, tre-api): copy from an external and internal bucket (#194)
Browse files Browse the repository at this point in the history
Co-authored-by: jaybeeelsdon <justin@chi.swan.ac.uk>
  • Loading branch information
mahadi99xy and jaybeeelsdon authored Oct 9, 2023
1 parent aceced9 commit 0984759
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 14 deletions.
49 changes: 39 additions & 10 deletions src/DARE-API/Services/ConsumeInternalMessageService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,32 @@
using BL.Models.Enums;
using DARE_API.Repositories.DbContexts;
using BL.Models.Tes;
using BL.Models.ViewModels;
using BL.Services;
using EasyNetQ.Management.Client.Model;
using Microsoft.EntityFrameworkCore.Metadata.Internal;
using static Microsoft.EntityFrameworkCore.DbLoggerCategory;
using System.Text.Json.Nodes;

namespace DARE_API.Services
{
public class ConsumeInternalMessageService : BackgroundService
{
private readonly IBus _bus;
private readonly ApplicationDbContext _dbContext;
private readonly MinioSettings _minioSettings;
private readonly IMinioHelper _minioHelper;
private readonly IDareClientHelper _clientHelper;


public ConsumeInternalMessageService(IBus bus , IServiceProvider serviceProvider)
public ConsumeInternalMessageService(IBus bus, IServiceProvider serviceProvider)
{
_bus = bus;
_dbContext = serviceProvider.CreateScope().ServiceProvider.GetRequiredService<ApplicationDbContext>(); ;

_dbContext = serviceProvider.CreateScope().ServiceProvider.GetRequiredService<ApplicationDbContext>();
_minioSettings = serviceProvider.CreateScope().ServiceProvider.GetRequiredService<MinioSettings>();
_minioHelper = serviceProvider.CreateScope().ServiceProvider.GetRequiredService<IMinioHelper>();
_clientHelper = serviceProvider.CreateScope().ServiceProvider.GetRequiredService<IDareClientHelper>();

}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
Expand All @@ -34,8 +46,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
}
catch (Exception e)
{
Log.Error("{Function} ConsumeProcessForm:- Failed to subscribe due to error: {e}","ExecuteAsync", e.Message);
Log.Error("{Function} ConsumeProcessForm:- Failed to subscribe due to error: {e}", "ExecuteAsync", e.Message);

}
}

Expand All @@ -47,6 +59,23 @@ private void Process(IMessage<int> message, MessageReceivedInfo info)

//TODO: Mahadi copy crate from external to local submission (if in external)

Uri uri = new Uri(sub.DockerInputLocation);
string fileName = Path.GetFileName(uri.LocalPath);

var messageMQ = new FetchFileMQ();
messageMQ.Url = sub.SourceCrate;
messageMQ.BucketName = sub.Project.SubmissionBucket;
messageMQ.Key = fileName;

if (uri.Host + ":" + uri.Port != _minioSettings.Url)
{
_minioHelper.RabbitExternalObject(messageMQ);

var minioEndpoint = _clientHelper.CallAPIWithoutModel<MinioEndpoint>("/api/Project/GetMinioEndPoint").Result;

messageMQ.Url = "http://" + minioEndpoint.Url + "/browser/" + messageMQ.BucketName + "/" + messageMQ.Key;
}

//TODO: Validate format of Crate

var dbproj = sub.Project;
Expand All @@ -59,7 +88,7 @@ private void Process(IMessage<int> message, MessageReceivedInfo info)
tres = trestr.Split('|').Select(x => x.ToLower()).ToList();
}



var dbtres = new List<BL.Models.Tre>();

Expand All @@ -75,12 +104,12 @@ private void Process(IMessage<int> message, MessageReceivedInfo info)
}
}
UpdateSubmissionStatus.UpdateStatus(sub, StatusType.WaitingForChildSubsToComplete, "");

foreach (var tre in dbtres)
{
_dbContext.Add(new Submission()
{
DockerInputLocation = tesTask.Executors.First().Image,
DockerInputLocation = messageMQ.Url,
Project = dbproj,
StartTime = DateTime.Now.ToUniversalTime(),
Status = StatusType.WaitingForAgentToTransfer,
Expand All @@ -99,7 +128,7 @@ private void Process(IMessage<int> message, MessageReceivedInfo info)
_dbContext.SaveChanges();
Log.Information("{Function} Processed sub for {id}", "Process", message.Body);


}
catch (Exception ex)
{
Expand All @@ -108,7 +137,7 @@ private void Process(IMessage<int> message, MessageReceivedInfo info)
}
}



private T ConvertByteArrayToType<T>(byte[] byteArray)
{
Expand Down
33 changes: 29 additions & 4 deletions src/TRE-API/DoAgentWork.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
using System.Threading.Tasks;
using TRE_API.Repositories.DbContexts;
using TRE_API.Services;
using BL.Services;
using Microsoft.AspNetCore.SignalR;

namespace TRE_API
{
Expand All @@ -39,11 +41,15 @@ public class DoAgentWork : IDoAgentWork
private readonly IServiceProvider _serviceProvider;
private readonly ApplicationDbContext _dbContext;
private readonly ISubmissionHelper _subHelper;
public DoAgentWork(IServiceProvider serviceProvider, ApplicationDbContext dbContext, ISubmissionHelper subHelper)
private readonly MinioSettings _minioSettings;
private readonly IMinioHelper _minioHelper;
public DoAgentWork(IServiceProvider serviceProvider, ApplicationDbContext dbContext, ISubmissionHelper subHelper, MinioSettings minioSettings, IMinioHelper minioHelper)
{
_serviceProvider = serviceProvider;
_dbContext = dbContext;
_subHelper = subHelper;
_minioSettings = minioSettings;
_minioHelper = minioHelper;
}

public void testing()
Expand Down Expand Up @@ -236,10 +242,10 @@ public void Execute()
{
Log.Information("Submission: {submission}", aSubmission);


//TODO: Put user approval check

// Check user is allowed ont he project
if ( ! _subHelper.IsUserApprovedOnProject(aSubmission.Project.Id, aSubmission.SubmittedBy.Id))
if (false && ! _subHelper.IsUserApprovedOnProject(aSubmission.Project.Id, aSubmission.SubmittedBy.Id))
{
Log.Error("User {UserID}/project {ProjectId} is not value for this submission {submission}", aSubmission.SubmittedBy.Id, aSubmission.Project.Id, aSubmission);
// record error with submission layer
Expand All @@ -248,7 +254,26 @@ public void Execute()
else
{


//TODO: Mahadi copy from submission input bucket to tre input bucket
try
{
Uri uri = new Uri(aSubmission.DockerInputLocation);
string fileName = Path.GetFileName(uri.LocalPath);
var sourceBucket = aSubmission.Project.SubmissionBucket;
var subProj = _dbContext.Projects.Where(x => x.SubmissionProjectId == aSubmission.Project.Id);
foreach (var proj in subProj)
{
var destinationBucket = proj.SubmissionBucketTre;
var copyResult = _minioHelper.CopyObject(_minioSettings, sourceBucket, destinationBucket, fileName, fileName);
}
}
catch (Exception ex)
{

throw;
}

// The TES message
var tesMessage = JsonConvert.DeserializeObject<TesTask>(aSubmission.TesJson);
var processedOK = true;
Expand Down

0 comments on commit 0984759

Please sign in to comment.