Skip to content

Commit

Permalink
add retry
Browse files Browse the repository at this point in the history
  • Loading branch information
mattbr0wn authored and alexopenline committed Aug 23, 2024
1 parent d35e618 commit 14b8608
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 93 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ go.work.sum
mailsherpa-*
*.csv
*.txt
validation_checkpoint.json
270 changes: 177 additions & 93 deletions bulkvalidate/bulkvalidate.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package bulkvalidate

import (
"bufio"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"log"
"os"
"strconv"
"strings"
"sync"

"github.com/lucasepe/codename"
"github.com/schollz/progressbar/v3"
Expand All @@ -16,87 +20,117 @@ import (
"github.com/customeros/mailsherpa/mailvalidate"
)

func read_csv(filePath string) ([]string, error) {
// Open the CSV file
file, err := os.Open(filePath)
const (
batchSize = 10
checkpointFile = "validation_checkpoint.json"
)

type Checkpoint struct {
ProcessedRows int `json:"processedRows"`
}

func RunBulkValidation(inputFilePath, outputFilePath string) error {
checkpoint, err := loadCheckpoint()
if err != nil {
return nil, fmt.Errorf("error opening file: %w", err)
return fmt.Errorf("error loading checkpoint: %w", err)
}
defer file.Close()

// Create a new CSV reader
reader := csv.NewReader(file)
reader, file, err := read_csv(inputFilePath)
if err != nil {
return fmt.Errorf("error reading input file: %w", err)
}
defer file.Close()

// Read all records from the CSV
records, err := reader.ReadAll()
// Read and store the header
header, err := reader.Read()
if err != nil {
return nil, fmt.Errorf("error reading CSV: %w", err)
return fmt.Errorf("error reading header: %w", err)
}

// Check if the CSV has at least one row (header) and one column
if len(records) < 1 || len(records[0]) != 1 {
return nil, fmt.Errorf("invalid CSV format: expected 1 column with a header row")
// Skip to the last processed row
for i := 0; i < checkpoint.ProcessedRows; i++ {
_, err := reader.Read()
if err != nil {
if err == io.EOF {
break
}
return fmt.Errorf("error skipping to checkpoint: %w", err)
}
}

// Create a slice to store the data (excluding the header)
data := make([]string, 0, len(records)-1)
catchAllResults := make(map[string]bool)
bar := progressbar.Default(-1)

outputFileExists := fileExists(outputFilePath)

// Append each row (skipping the header) to the data slice
for _, row := range records[1:] {
data = append(data, row[0])
for {
batch, err := readBatch(reader, batchSize)
if err != nil {
return fmt.Errorf("error reading batch: %w", err)
}
if len(batch) == 0 {
break
}

results := processBatch(batch, catchAllResults)

err = writeResultsFile(results, outputFilePath, outputFileExists, header)
if err != nil {
return fmt.Errorf("error writing results: %w", err)
}

checkpoint.ProcessedRows += len(batch)
err = saveCheckpoint(checkpoint)
if err != nil {
return fmt.Errorf("error saving checkpoint: %w", err)
}

bar.Add(len(batch))
outputFileExists = true
}

return nil
}

func read_csv(filePath string) (*csv.Reader, *os.File, error) {
file, err := os.Open(filePath)
if err != nil {
return nil, nil, fmt.Errorf("error opening file: %w", err)
}

return data, nil
reader := csv.NewReader(bufio.NewReader(file))
return reader, file, nil
}

func writeResultsFile(results []run.VerifyEmailResponse, filePath string) error {
// Create or open the CSV file
file, err := os.Create(filePath)
func writeResultsFile(results []run.VerifyEmailResponse, filePath string, append bool, header []string) error {
flag := os.O_CREATE | os.O_WRONLY
if append {
flag |= os.O_APPEND
} else {
flag |= os.O_TRUNC
}

file, err := os.OpenFile(filePath, flag, 0644)
if err != nil {
return fmt.Errorf("error creating file: %w", err)
return fmt.Errorf("error opening file: %w", err)
}
defer file.Close()

// Create a new CSV writer
writer := csv.NewWriter(file)
defer writer.Flush()

// Write the header row
header := []string{
"Email",
"Username",
"Domain",
"IsValidSyntax",
"IsDeliverable",
"Provider",
"Firewall",
"IsRisky",
"IsFirewalled",
"IsFreeAccount",
"IsRoleAccount",
"IsMailboxFull",
"IsCatchAll",
"SmtpSuccess",
"SmtpRetry",
"SmtpResponseCode",
"SmtpErrorCode",
"SmtpDescription",
}

if err := writer.Write(header); err != nil {
return fmt.Errorf("error writing header: %w", err)
}

// Write each DomainResponse as a row in the CSV
if !append {
if err := writer.Write(header); err != nil {
return fmt.Errorf("error writing header: %w", err)
}
}

for _, resp := range results {
row := []string{
resp.Email,
resp.Syntax.User,
resp.Syntax.Domain,
resp.Email, resp.Syntax.User, resp.Syntax.Domain,
strconv.FormatBool(resp.Syntax.IsValid),
strconv.FormatBool(resp.IsDeliverable),
resp.Provider,
resp.Firewall,
resp.Provider, resp.Firewall,
strconv.FormatBool(resp.IsRisky),
strconv.FormatBool(resp.Risk.IsFirewalled),
strconv.FormatBool(resp.Risk.IsFreeAccount),
Expand All @@ -105,9 +139,7 @@ func writeResultsFile(results []run.VerifyEmailResponse, filePath string) error
strconv.FormatBool(resp.Risk.IsCatchAll),
strconv.FormatBool(resp.Smtp.Success),
strconv.FormatBool(resp.Smtp.Retry),
resp.Smtp.ResponseCode,
resp.Smtp.ErrorCode,
resp.Smtp.Description,
resp.Smtp.ResponseCode, resp.Smtp.ErrorCode, resp.Smtp.Description,
}
if err := writer.Write(row); err != nil {
return fmt.Errorf("error writing row: %w", err)
Expand All @@ -117,50 +149,97 @@ func writeResultsFile(results []run.VerifyEmailResponse, filePath string) error
return nil
}

func RunBulkValidation(inputFilePath, outputFilePath string) error {
testEmails, err := read_csv(inputFilePath)
if err != nil {
return fmt.Errorf("error reading input file: %w", err)
func readBatch(reader *csv.Reader, batchSize int) ([]string, error) {
var batch []string
for i := 0; i < batchSize; i++ {
record, err := reader.Read()
if err != nil {
if err == io.EOF {
// End of file reached
return batch, nil
}
if err == csv.ErrFieldCount {
// Skip records with incorrect field count
continue
}
// Return any other error
return batch, err
}
if len(record) > 0 {
batch = append(batch, record[0])
}
}
return batch, nil
}

catchAllResults := make(map[string]bool)
var output []run.VerifyEmailResponse
func processBatch(batch []string, catchAllResults map[string]bool) []run.VerifyEmailResponse {
var results []run.VerifyEmailResponse
var wg sync.WaitGroup
resultsChan := make(chan run.VerifyEmailResponse, len(batch))

bar := progressbar.Default(int64(len(testEmails)))
for _, email := range batch {
wg.Add(1)
go func(email string) {
defer wg.Done()
request := run.BuildRequest(email)
_, domain, _ := syntax.GetEmailUserAndDomain(email)
validateCatchAll := false
if _, exists := catchAllResults[domain]; !exists {
validateCatchAll = true
}
syntaxResults := mailvalidate.ValidateEmailSyntax(email)
domainResults, err := mailvalidate.ValidateDomain(request, validateCatchAll)
if err != nil {
log.Printf("Error: %s %s", email, err.Error())
}
emailResults, err := mailvalidate.ValidateEmail(request)
if err != nil {
log.Printf("Error: %s %s", email, err.Error())
}
isCatchAll := domainResults.IsCatchAll
if validateCatchAll {
catchAllResults[domain] = isCatchAll
} else {
isCatchAll = catchAllResults[domain]
}
results := run.BuildResponse(email, syntaxResults, domainResults, emailResults)
resultsChan <- results
}(email)
}

for _, email := range testEmails {
bar.Add(1)
request := run.BuildRequest(email)
wg.Wait()
close(resultsChan)

_, domain, _ := syntax.GetEmailUserAndDomain(email)
validateCatchAll := false
if _, exists := catchAllResults[domain]; !exists {
validateCatchAll = true
}
for result := range resultsChan {
results = append(results, result)
}

syntaxResults := mailvalidate.ValidateEmailSyntax(email)
domainResults, err := mailvalidate.ValidateDomain(request, validateCatchAll)
if err != nil {
log.Printf("Error: %s %s", email, err.Error())
}
emailResults, err := mailvalidate.ValidateEmail(request)
if err != nil {
log.Printf("Error: %s %s", email, err.Error())
}
return results
}

isCatchAll := domainResults.IsCatchAll
if validateCatchAll {
catchAllResults[domain] = isCatchAll
} else {
isCatchAll = catchAllResults[domain]
}
func loadCheckpoint() (Checkpoint, error) {
var checkpoint Checkpoint
file, err := os.Open(checkpointFile)
if os.IsNotExist(err) {
return checkpoint, nil
}
if err != nil {
return checkpoint, err
}
defer file.Close()

err = json.NewDecoder(file).Decode(&checkpoint)
return checkpoint, err
}

results := run.BuildResponse(email, syntaxResults, domainResults, emailResults)
output = append(output, results)
func saveCheckpoint(checkpoint Checkpoint) error {
file, err := os.Create(checkpointFile)
if err != nil {
return err
}
writeResultsFile(output, outputFilePath)
defer file.Close()

return nil
return json.NewEncoder(file).Encode(checkpoint)
}

func generateCatchAllUsername() string {
Expand All @@ -171,3 +250,8 @@ func generateCatchAllUsername() string {
name := codename.Generate(rng, 0)
return strings.ReplaceAll(name, "-", "")
}

func fileExists(filename string) bool {
_, err := os.Stat(filename)
return !os.IsNotExist(err)
}

0 comments on commit 14b8608

Please sign in to comment.