31. Bulk Operations Pattern

Handling large-scale data operations efficiently with job-based processing, result tracking, and asynchronous polling

About this chapter

Handle large-scale operations that would timeout in HTTP requests by using background jobs for bulk processing with progress tracking and result retrieval.

  • Bulk operation scenarios: Import, update, delete, export at scale
  • Job-based approach: Enqueueing work and returning job IDs immediately
  • Progress tracking: Storing operation progress and result counts
  • Result persistence: Saving job results for later retrieval
  • Polling patterns: Clients checking job status until completion
  • Error recovery: Handling partial failures and retries in bulk operations

Learning outcomes:

  • Design bulk operation endpoints that enqueue jobs
  • Implement job progress tracking and status updates
  • Create endpoints to retrieve job status and results
  • Handle partial failures in bulk operations
  • Design DTOs for bulk requests and responses
  • Provide clients with mechanisms to track long-running operations

31.1 When Bulk Operations Matter

Some operations involve processing hundreds or thousands of items. Running them synchronously in HTTP requests fails:

The Problem:

User: "Create 5,000 commands from CSV"
       ↓
API receives request
       ↓
HTTP timeout (usually 30 seconds)
       ↓
User gets error
       ↓
Did it process? Who knows.

The Solution: Bulk Operations with Background Jobs

User: "Create 5,000 commands from CSV"
       ↓
API validates file, enqueues bulk job, returns job ID
       ↓
Returns 202 Accepted immediately
       ↓
Background worker processes items, tracks progress
       ↓
User polls for status or gets webhook callback
       ↓
Results available for retrieval

Common bulk scenarios:

  • Bulk create: Import CSV of commands
  • Bulk update: Apply configuration to 1,000 platforms
  • Bulk delete: Archive old commands created before 2023
  • Bulk export: Generate reports for 500 users
  • Data migration: Transform data format for all records

31.2 The Bulk Operation Model

Design a model to track bulk operations:

// Models/BulkOperation.cs
using System;
using System.Collections.Generic;

namespace CommandAPI.Models
{
    public class BulkOperation
    {
        public int Id { get; set; }
        public string OperationType { get; set; } // "CreateCommands", "UpdatePlatforms"
        public string Status { get; set; } // "Queued", "Processing", "Completed", "Failed"
        public int TotalItems { get; set; }
        public int ProcessedItems { get; set; }
        public int FailedItems { get; set; }
        public DateTime CreatedAt { get; set; }
        public DateTime? StartedAt { get; set; }
        public DateTime? CompletedAt { get; set; }
        public string? ErrorMessage { get; set; }
        
        // Relationship to hang onto
        public ICollection<BulkOperationResult> Results { get; set; } = new List<BulkOperationResult>();
    }

    public class BulkOperationResult
    {
        public int Id { get; set; }
        public int BulkOperationId { get; set; }
        public int? CreatedEntityId { get; set; } // ID of created command/platform/etc
        public string ItemInput { get; set; } // Original input (CSV row, JSON, etc)
        public bool Success { get; set; }
        public string? ErrorMessage { get; set; }
        public DateTime ProcessedAt { get; set; }

        public BulkOperation BulkOperation { get; set; }
    }
}

// Add to DbContext
public class AppDbContext : DbContext
{
    public DbSet<BulkOperation> BulkOperations { get; set; }
    public DbSet<BulkOperationResult> BulkOperationResults { get; set; }
    
    // ... other DbSets
}

31.3 Bulk Operation Repository

Create a repository to track bulk operations:

// Repositories/IBulkOperationRepository.cs
using System.Collections.Generic;
using System.Threading.Tasks;
using CommandAPI.Models;

namespace CommandAPI.Data
{
    public interface IBulkOperationRepository
    {
        Task<BulkOperation?> GetByIdAsync(int id);
        Task<IEnumerable<BulkOperation>> GetAllAsync();
        void CreateBulkOperation(BulkOperation operation);
        void UpdateBulkOperation(BulkOperation operation);
        void CreateResult(BulkOperationResult result);
        Task<IEnumerable<BulkOperationResult>> GetResultsAsync(int bulkOperationId);
        Task SaveChangesAsync();
    }
}

// Repositories/PgSqlBulkOperationRepository.cs
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using CommandAPI.Data;
using CommandAPI.Models;

namespace CommandAPI.Repositories
{
    public class PgSqlBulkOperationRepository : IBulkOperationRepository
    {
        private readonly AppDbContext _context;

        public PgSqlBulkOperationRepository(AppDbContext context)
        {
            _context = context;
        }

        public async Task<BulkOperation?> GetByIdAsync(int id)
        {
            return await _context.BulkOperations
                .Include(b => b.Results)
                .FirstOrDefaultAsync(b => b.Id == id);
        }

        public async Task<IEnumerable<BulkOperation>> GetAllAsync()
        {
            return await _context.BulkOperations
                .OrderByDescending(b => b.CreatedAt)
                .ToListAsync();
        }

        public void CreateBulkOperation(BulkOperation operation)
        {
            _context.BulkOperations.Add(operation);
        }

        public void UpdateBulkOperation(BulkOperation operation)
        {
            _context.BulkOperations.Update(operation);
        }

        public void CreateResult(BulkOperationResult result)
        {
            _context.BulkOperationResults.Add(result);
        }

        public async Task<IEnumerable<BulkOperationResult>> GetResultsAsync(int bulkOperationId)
        {
            return await _context.BulkOperationResults
                .Where(r => r.BulkOperationId == bulkOperationId)
                .OrderByDescending(r => r.ProcessedAt)
                .ToListAsync();
        }

        public async Task SaveChangesAsync()
        {
            await _context.SaveChangesAsync();
        }
    }
}

31.4 API Endpoint for Bulk Create

Create an endpoint that accepts bulk data and enqueues a job:

// DTOs/BulkCreateCommandDto.cs
using System.Collections.Generic;

namespace CommandAPI.Dtos
{
    public class BulkCreateCommandDto
    {
        public int PlatformId { get; set; }
        public List<CommandInput> Commands { get; set; }
    }

    public class CommandInput
    {
        public string HowTo { get; set; }
        public string CommandLine { get; set; }
    }
}

// Controllers/BulkCommandsController.cs
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using Hangfire;
using CommandAPI.Dtos;
using CommandAPI.Data;
using CommandAPI.Models;
using CommandAPI.Jobs;

namespace CommandAPI.Controllers
{
    [ApiController]
    [Route("api/[controller]")]
    public class BulkCommandsController : ControllerBase
    {
        private readonly IBulkOperationRepository _bulkRepository;
        private readonly IBackgroundJobClient _backgroundJobClient;
        private readonly ILogger<BulkCommandsController> _logger;

        public BulkCommandsController(
            IBulkOperationRepository bulkRepository,
            IBackgroundJobClient backgroundJobClient,
            ILogger<BulkCommandsController> logger)
        {
            _bulkRepository = bulkRepository;
            _backgroundJobClient = backgroundJobClient;
            _logger = logger;
        }

        [HttpPost("create-commands")]
        public async Task<ActionResult> CreateCommandsBulk(BulkCreateCommandDto dto)
        {
            if (dto.Commands == null || dto.Commands.Count == 0)
                return BadRequest("No commands provided");

            if (dto.Commands.Count > 10000)
                return BadRequest("Cannot process more than 10,000 items at once");

            // Create bulk operation record
            var bulkOperation = new BulkOperation
            {
                OperationType = "CreateCommands",
                Status = "Queued",
                TotalItems = dto.Commands.Count,
                ProcessedItems = 0,
                FailedItems = 0,
                CreatedAt = DateTime.UtcNow
            };

            _bulkRepository.CreateBulkOperation(bulkOperation);
            await _bulkRepository.SaveChangesAsync();

            // Enqueue background job
            _backgroundJobClient.Enqueue<IBulkCreateCommandsJob>(
                job => job.ExecuteAsync(bulkOperation.Id, dto.PlatformId, dto.Commands));

            _logger.LogInformation(
                "Bulk create job enqueued. BulkOperationId: {BulkOperationId}, ItemCount: {ItemCount}",
                bulkOperation.Id, dto.Commands.Count);

            return Accepted(new { bulkOperationId = bulkOperation.Id, status = "Queued" });
        }

        [HttpGet("{id}")]
        public async Task<ActionResult> GetBulkOperation(int id)
        {
            var bulkOperation = await _bulkRepository.GetByIdAsync(id);
            if (bulkOperation == null)
                return NotFound();

            return Ok(new
            {
                bulkOperation.Id,
                bulkOperation.OperationType,
                bulkOperation.Status,
                Progress = new
                {
                    bulkOperation.ProcessedItems,
                    Total = bulkOperation.TotalItems,
                    Failed = bulkOperation.FailedItems,
                    Percentage = bulkOperation.TotalItems > 0 
                        ? Math.Round((decimal)bulkOperation.ProcessedItems / bulkOperation.TotalItems * 100, 2)
                        : 0
                },
                bulkOperation.CreatedAt,
                bulkOperation.StartedAt,
                bulkOperation.CompletedAt,
                bulkOperation.ErrorMessage
            });
        }

        [HttpGet("{id}/results")]
        public async Task<ActionResult> GetBulkResults(int id)
        {
            var results = await _bulkRepository.GetResultsAsync(id);
            return Ok(results);
        }
    }
}

31.5 The Bulk Processing Job

Implement the job that processes items one by one:

// Jobs/IBulkCreateCommandsJob.cs
using System.Collections.Generic;
using System.Threading.Tasks;
using CommandAPI.Dtos;

namespace CommandAPI.Jobs
{
    public interface IBulkCreateCommandsJob
    {
        Task ExecuteAsync(int bulkOperationId, int platformId, List<CommandInput> commands);
    }
}

// Jobs/BulkCreateCommandsJob.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using AutoMapper;
using CommandAPI.Data;
using CommandAPI.Dtos;
using CommandAPI.Models;

namespace CommandAPI.Jobs
{
    public class BulkCreateCommandsJob : IBulkCreateCommandsJob
    {
        private readonly ICommandRepository _commandRepository;
        private readonly IBulkOperationRepository _bulkRepository;
        private readonly IMapper _mapper;
        private readonly ILogger<BulkCreateCommandsJob> _logger;

        public BulkCreateCommandsJob(
            ICommandRepository commandRepository,
            IBulkOperationRepository bulkRepository,
            IMapper mapper,
            ILogger<BulkCreateCommandsJob> logger)
        {
            _commandRepository = commandRepository;
            _bulkRepository = bulkRepository;
            _mapper = mapper;
            _logger = logger;
        }

        public async Task ExecuteAsync(int bulkOperationId, int platformId, List<CommandInput> commands)
        {
            var bulkOperation = await _bulkRepository.GetByIdAsync(bulkOperationId);
            if (bulkOperation == null)
            {
                _logger.LogError("Bulk operation {BulkOperationId} not found", bulkOperationId);
                throw new InvalidOperationException($"Bulk operation {bulkOperationId} not found");
            }

            try
            {
                bulkOperation.Status = "Processing";
                bulkOperation.StartedAt = DateTime.UtcNow;
                _bulkRepository.UpdateBulkOperation(bulkOperation);
                await _bulkRepository.SaveChangesAsync();

                _logger.LogInformation("Starting bulk create operation {BulkOperationId} with {Count} items",
                    bulkOperationId, commands.Count);

                // Process items one by one
                foreach (var (commandInput, index) in commands.Select((cmd, idx) => (cmd, idx)))
                {
                    try
                    {
                        // Validate input
                        if (string.IsNullOrWhiteSpace(commandInput.HowTo) ||
                            string.IsNullOrWhiteSpace(commandInput.CommandLine))
                        {
                            throw new ArgumentException("HowTo and CommandLine are required");
                        }

                        // Create command
                        var command = new Command
                        {
                            PlatformId = platformId,
                            HowTo = commandInput.HowTo,
                            CommandLine = commandInput.CommandLine
                        };

                        _commandRepository.CreateCommand(command);
                        await _commandRepository.SaveChangesAsync();

                        // Record successful result
                        var result = new BulkOperationResult
                        {
                            BulkOperationId = bulkOperationId,
                            CreatedEntityId = command.Id,
                            ItemInput = System.Text.Json.JsonSerializer.Serialize(commandInput),
                            Success = true,
                            ProcessedAt = DateTime.UtcNow
                        };

                        _bulkRepository.CreateResult(result);

                        // Update progress
                        bulkOperation.ProcessedItems++;
                        _bulkRepository.UpdateBulkOperation(bulkOperation);
                        await _bulkRepository.SaveChangesAsync();

                        _logger.LogInformation(
                            "Item {Index}/{Total} processed successfully for bulk operation {BulkOperationId}",
                            index + 1, commands.Count, bulkOperationId);
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex,
                            "Failed to process item {Index} in bulk operation {BulkOperationId}",
                            index + 1, bulkOperationId);

                        // Record failed result
                        var failedResult = new BulkOperationResult
                        {
                            BulkOperationId = bulkOperationId,
                            ItemInput = System.Text.Json.JsonSerializer.Serialize(commandInput),
                            Success = false,
                            ErrorMessage = ex.Message,
                            ProcessedAt = DateTime.UtcNow
                        };

                        _bulkRepository.CreateResult(failedResult);
                        bulkOperation.FailedItems++;
                        _bulkRepository.UpdateBulkOperation(bulkOperation);
                        await _bulkRepository.SaveChangesAsync();

                        // Continue processing other items instead of failing entirely
                    }
                }

                // Mark as completed
                bulkOperation.Status = "Completed";
                bulkOperation.CompletedAt = DateTime.UtcNow;
                _bulkRepository.UpdateBulkOperation(bulkOperation);
                await _bulkRepository.SaveChangesAsync();

                _logger.LogInformation(
                    "Bulk operation {BulkOperationId} completed. Processed: {Processed}, Failed: {Failed}",
                    bulkOperationId, bulkOperation.ProcessedItems, bulkOperation.FailedItems);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Bulk operation {BulkOperationId} failed with exception", bulkOperationId);

                bulkOperation.Status = "Failed";
                bulkOperation.ErrorMessage = ex.Message;
                bulkOperation.CompletedAt = DateTime.UtcNow;
                _bulkRepository.UpdateBulkOperation(bulkOperation);
                await _bulkRepository.SaveChangesAsync();

                throw;
            }
        }
    }
}

31.6 Polling for Results (Client Perspective)

Clients poll the status endpoint:

// JavaScript/TypeScript client example
async function pollBulkOperationStatus(bulkOperationId) {
    const maxAttempts = 120; // 10 minutes with 5-second intervals
    let attempt = 0;

    while (attempt < maxAttempts) {
        const response = await fetch(`/api/bulk-commands/${bulkOperationId}`);
        const operation = await response.json();

        console.log(`Progress: ${operation.progress.processedItems}/${operation.progress.total}`);

        if (operation.status === 'Completed' || operation.status === 'Failed') {
            return operation;
        }

        // Wait 5 seconds before polling again
        await new Promise(resolve => setTimeout(resolve, 5000));
        attempt++;
    }

    throw new Error('Bulk operation polling timeout');
}

// Usage
const bulkOp = await pollBulkOperationStatus(123);
if (bulkOp.status === 'Completed') {
    console.log(`Successfully created ${bulkOp.progress.processedItems} items`);
}

31.7 Webhook Callbacks (Better Than Polling)

Instead of polling, notify clients via webhook when bulk operation completes:

// Models/WebhookSubscription.cs
using System;

namespace CommandAPI.Models
{
    public class WebhookSubscription
    {
        public int Id { get; set; }
        public string CallbackUrl { get; set; }
        public string EventType { get; set; } // "BulkOperationCompleted"
        public bool IsActive { get; set; }
        public DateTime CreatedAt { get; set; }
    }
}

// Services/IWebhookService.cs
using System.Threading.Tasks;

namespace CommandAPI.Services
{
    public interface IWebhookService
    {
        Task TriggerBulkOperationCompletedAsync(int bulkOperationId);
    }
}

// Services/WebhookService.cs
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using CommandAPI.Data;

namespace CommandAPI.Services
{
    public class WebhookService : IWebhookService
    {
        private readonly IBulkOperationRepository _bulkRepository;
        private readonly HttpClient _httpClient;
        private readonly ILogger<WebhookService> _logger;

        public WebhookService(
            IBulkOperationRepository bulkRepository,
            HttpClient httpClient,
            ILogger<WebhookService> logger)
        {
            _bulkRepository = bulkRepository;
            _httpClient = httpClient;
            _logger = logger;
        }

        public async Task TriggerBulkOperationCompletedAsync(int bulkOperationId)
        {
            var bulkOperation = await _bulkRepository.GetByIdAsync(bulkOperationId);
            if (bulkOperation == null)
                return;

            var webhookData = new
            {
                eventType = "BulkOperationCompleted",
                bulkOperationId = bulkOperationId,
                operationType = bulkOperation.OperationType,
                status = bulkOperation.Status,
                processedItems = bulkOperation.ProcessedItems,
                failedItems = bulkOperation.FailedItems,
                completedAt = bulkOperation.CompletedAt,
                timestamp = DateTime.UtcNow
            };

            var content = new StringContent(
                JsonSerializer.Serialize(webhookData),
                System.Text.Encoding.UTF8,
                "application/json");

            try
            {
                var response = await _httpClient.PostAsync(
                    "https://client-webhook-url.com/bulk-complete",
                    content);

                if (response.IsSuccessStatusCode)
                {
                    _logger.LogInformation(
                        "Webhook triggered successfully for bulk operation {BulkOperationId}",
                        bulkOperationId);
                }
                else
                {
                    _logger.LogWarning(
                        "Webhook returned {StatusCode} for bulk operation {BulkOperationId}",
                        response.StatusCode, bulkOperationId);
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Failed to trigger webhook for bulk operation {BulkOperationId}",
                    bulkOperationId);
            }
        }
    }
}

Update the bulk job to call the webhook:

// In BulkCreateCommandsJob.cs
private readonly IWebhookService _webhookService;

// In ExecuteAsync, after marking as completed:
bulkOperation.Status = "Completed";
bulkOperation.CompletedAt = DateTime.UtcNow;
_bulkRepository.UpdateBulkOperation(bulkOperation);
await _bulkRepository.SaveChangesAsync();

// Notify subscribers
await _webhookService.TriggerBulkOperationCompletedAsync(bulkOperationId);

31.8 Handling Large Files (Streaming)

For CSV uploads with thousands of rows, stream them instead of loading all into memory:

// Controllers/BulkCommandsController.cs
using System.IO;
using System.Text.Csv;

[HttpPost("create-commands-from-csv")]
public async Task<ActionResult> CreateCommandsFromCsv(int platformId, IFormFile csvFile)
{
    if (csvFile == null || csvFile.Length == 0)
        return BadRequest("CSV file is required");

    if (csvFile.Length > 50 * 1024 * 1024) // 50 MB limit
        return BadRequest("File too large. Max 50 MB");

    // Create bulk operation
    var bulkOperation = new BulkOperation
    {
        OperationType = "CreateCommandsFromCsv",
        Status = "Queued",
        TotalItems = 0, // Will update after reading CSV
        ProcessedItems = 0,
        FailedItems = 0,
        CreatedAt = DateTime.UtcNow
    };

    _bulkRepository.CreateBulkOperation(bulkOperation);
    await _bulkRepository.SaveChangesAsync();

    // Count rows first
    using (var stream = csvFile.OpenReadStream())
    using (var reader = new StreamReader(stream))
    {
        int lineCount = 0;
        string? line;
        while ((line = await reader.ReadLineAsync()) != null)
        {
            lineCount++;
        }
        bulkOperation.TotalItems = lineCount - 1; // Exclude header
    }

    _bulkRepository.UpdateBulkOperation(bulkOperation);
    await _bulkRepository.SaveChangesAsync();

    // Enqueue job with file path
    var filePath = Path.Combine(Path.GetTempPath(), $"bulk_{bulkOperation.Id}.csv");
    using (var fileStream = new FileStream(filePath, FileMode.Create))
    {
        await csvFile.CopyToAsync(fileStream);
    }

    _backgroundJobClient.Enqueue<IBulkCreateCommandsFromCsvJob>(
        job => job.ExecuteAsync(bulkOperation.Id, platformId, filePath));

    return Accepted(new { bulkOperationId = bulkOperation.Id });
}

31.9 Limiting Concurrent Bulk Operations

Prevent system overload by limiting concurrent jobs:

// Program.cs
builder.Services.AddHangfire(config =>
{
    config.UsePostgreSqlStorage(connectionString);
    
    // Global config: use background job server
});

builder.Services.AddHangfireServer(options =>
{
    options.ServerName = "CommandAPI-Server";
    options.WorkerCount = 4; // Maximum 4 concurrent jobs
    options.Queues = new[] { "default", "bulk-operations" };
});

// In controller, use specific queue
_backgroundJobClient.Enqueue<IBulkCreateCommandsJob>(
    job => job.ExecuteAsync(bulkOperation.Id, dto.PlatformId, dto.Commands),
    queue: "bulk-operations"); // Isolated queue for bulk jobs

31.10 Cleanup and Archiving

Bulk operations table grows over time. Archive old operations:

// Jobs/ICleanupBulkOperationsJob.cs
using System.Threading.Tasks;

namespace CommandAPI.Jobs
{
    public interface ICleanupBulkOperationsJob
    {
        Task ExecuteAsync();
    }
}

// Jobs/CleanupBulkOperationsJob.cs
using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using CommandAPI.Data;

namespace CommandAPI.Jobs
{
    public class CleanupBulkOperationsJob : ICleanupBulkOperationsJob
    {
        private readonly AppDbContext _context;
        private readonly ILogger<CleanupBulkOperationsJob> _logger;

        public CleanupBulkOperationsJob(AppDbContext context, ILogger<CleanupBulkOperationsJob> logger)
        {
            _context = context;
            _logger = logger;
        }

        public async Task ExecuteAsync()
        {
            try
            {
                // Archive bulk operations older than 30 days
                var cutoffDate = DateTime.UtcNow.AddDays(-30);
                var oldOperations = _context.BulkOperations
                    .Where(b => b.CompletedAt < cutoffDate)
                    .ToList();

                if (oldOperations.Count == 0)
                {
                    _logger.LogInformation("No old bulk operations to archive");
                    return;
                }

                // In real scenario, write to archive table or backup storage
                _logger.LogInformation("Archiving {Count} old bulk operations", oldOperations.Count);

                // Delete old results first (foreign key constraint)
                var oldResultIds = oldOperations.Select(b => b.Id).ToList();
                var resultsToDelete = _context.BulkOperationResults
                    .Where(r => oldResultIds.Contains(r.BulkOperationId))
                    .ToList();

                _context.BulkOperationResults.RemoveRange(resultsToDelete);
                _context.BulkOperations.RemoveRange(oldOperations);

                await _context.SaveChangesAsync();

                _logger.LogInformation("Successfully archived {Count} bulk operations and {ResultCount} results",
                    oldOperations.Count, resultsToDelete.Count);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Cleanup bulk operations job failed");
                throw;
            }
        }
    }
}

// Register as recurring job in Program.cs
RecurringJob.AddOrUpdate<ICleanupBulkOperationsJob>(
    "cleanup-bulk-operations",
    job => job.ExecuteAsync(),
    Cron.Daily(2)); // Run at 2 AM daily

31.11 Monitoring Bulk Operations

Track metrics for performance:

// Services/IBulkOperationMetricsService.cs
using System.Threading.Tasks;

namespace CommandAPI.Services
{
    public interface IBulkOperationMetricsService
    {
        Task RecordBulkOperationStartAsync(int bulkOperationId, int itemCount);
        Task RecordBulkOperationCompletedAsync(int bulkOperationId, int processedCount, int failedCount, long durationMs);
        Task RecordBulkOperationFailedAsync(int bulkOperationId, string errorType);
    }
}

// Monitor in job
public async Task ExecuteAsync(int bulkOperationId, int platformId, List<CommandInput> commands)
{
    var startTime = DateTime.UtcNow;

    try
    {
        await _metricsService.RecordBulkOperationStartAsync(bulkOperationId, commands.Count);
        
        // ... processing logic ...
        
        var duration = (DateTime.UtcNow - startTime).TotalMilliseconds;
        await _metricsService.RecordBulkOperationCompletedAsync(
            bulkOperationId,
            bulkOperation.ProcessedItems,
            bulkOperation.FailedItems,
            (long)duration);
    }
    catch (Exception ex)
    {
        await _metricsService.RecordBulkOperationFailedAsync(bulkOperationId, ex.GetType().Name);
        throw;
    }
}

31.12 Best Practices for Bulk Operations

1. Always validate before enqueueing:

// Check file size, format, row count BEFORE creating job
if (commands.Count > MaxItemsPerBulkOperation)
    return BadRequest($"Maximum {MaxItemsPerBulkOperation} items per request");

2. Use transactions carefully:

// Wrong: One transaction for entire bulk operation
using var transaction = await _context.Database.BeginTransactionAsync();
// Process 5,000 items... // Takes forever, locks tables

// Right: Batch transactions
for (int i = 0; i < items.Count; i += 100)
{
    using var transaction = await _context.Database.BeginTransactionAsync();
    // Process 100 items
    await _context.SaveChangesAsync();
    await transaction.CommitAsync();
}

3. Provide partial success:

// Don't fail entire operation if one item fails
try
{
    await ProcessItem(item);
}
catch (Exception ex)
{
    // Record failure, continue with next item
    bulkOperation.FailedItems++;
}

4. Set appropriate timeouts:

// Don't let jobs run forever
[AutomaticRetry(Attempts = 3)]
[JobDisplayName("Bulk Create: {0} items")]
public async Task ExecuteAsync(int bulkOperationId, int platformId, List<CommandInput> commands)
{
    using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(30));
    // Respect timeout
}

5. Clean up temporary resources:

// Delete uploaded files after processing
try
{
    // Process CSV file
}
finally
{
    if (File.Exists(csvFilePath))
        File.Delete(csvFilePath);
}

31.13 What’s Next

You now have:

  • ✓ Bulk operation models and repositories
  • ✓ API endpoints for enqueueing bulk jobs
  • ✓ Long-running job processing with progress tracking
  • ✓ Result storage for auditing
  • ✓ Polling and webhook-based status notifications
  • ✓ Large file streaming support
  • ✓ Concurrent job limiting
  • ✓ Cleanup and archiving strategies
  • ✓ Performance metrics

Part 11 Complete! You’ve mastered:

  • Background jobs with Hangfire
  • Job classes with DI and logging
  • Status tracking and retries
  • Bulk operations at scale

Next: Part 12 - Cross-Cutting Concerns: Logging, monitoring, security, and production readiness.