James Thomas

Notes on software.

Faster File Transfers With Serverless

This week I’ve been helping a client speed up file transfers between cloud object stores using serverless.

They had a 120GB file on a cloud provider’s object store. This needed copying into a different cloud object store for integration with platform services. Their current file transfer process was to download the file locally and then re-upload using a development machine. This was taking close to three hours due to bandwidth issues.

Having heard about the capabilities of serverless cloud platforms, they were wondering if they could use the massive parallelism that serverless provides to speed up that process? 🤔

After some investigating, I worked out a way to use serverless to implement concurrent file transfers. Transfer time was reduced from THREE HOURS to just FOUR MINUTES! This was a decrease in total transfer time of 98%. 👏👏👏

In this blog post, I’ll outlined the simple steps I used to make this happen. I’ve been using IBM Cloud Functions as the serverless platform. Two different S3-compatible Object Stores were used for the file transfers. The approach should work for any object store with the features outlined below.

S3-Compatible API Features

Both object stores being used for the file transfers provided an S3-compatible API. The S3 API has two features that, when combined, enable concurrent file transfers: Range Reads and Multi-Part Transfers.

Range Reads

The HTTP/1.1 protocol defines a Range header which allows the client to retrieve part of a document. The client specifies a byte range using the header value, e.g. Range: bytes=0-499. The byte values are then returned in the HTTP response with a HTTP 206 status code. If the byte range is invalid, a HTTP 416 response is returned.

The S3 API supports Range request headers on GET HTTP requests for object store files.

Sending a HTTP HEAD request for an object store file will return the file size (using the Content-Length header value). Creating ranges for fixed byte chunks up to this file size (0-1023, 1024-2047,2048-3072 …) allows all sections of a file to be retrieve in parallel.

Multi-Part Transfers

Files are uploaded to buckets using HTTP PUT requests. These operations supports a maximum file size of 5GB. Uploading larger files is only possible using “Multi-Part” transfers.

Clients initiate a multi-part transfer using the API and are returned an upload identifier. The large file is then split into parts which are uploaded using individual HTTP PUT requests. The upload identifier is used to tags individual requests as belonging to the same file. Once all parts have been uploaded, the API is used to confirm the file is finished.

File parts do not have to be uploaded in consecutive order and multiple parts can be uploaded simultaneously.

Serverless File Transfers

Combing these two features, I was able to create a serverless function to copy a part of a file between source and destination buckets. By invoking thousands of these functions in parallel, the entire file could be simultaneously copied in parallel streams between buckets. This was controlled by a local script used to manage the function invocations, monitor progress and complete the multi-part transfer once invocations had finished.

Serverless Function

The serverless function copies a file part between object stores. It is invoked with all the parameters needed to access both bucket files, byte range to copy and multi-part transfer identifier.

1
2
3
4
5
6
exports.main = async function main (params) {
  const { src_bucket, src_file, range, dest_bucket, dest_file, mpu, index} = params
  const byte_range = await read_range(src_bucket, src_file, range)
  const upload_result = await upload_part(dest_bucket, dest_file, mpu, index, byte_range)
  return upload_result
}

Read Source File Part

The S3-API JS client can create a ”Range Read” request by passing the Range parameter with the byte range value, e.g. bytes=0-NN.

1
2
3
4
const read_range = async (Bucket, Key, Range) => {
  const file_range = await s3.getObject({Bucket, Key, Range}).promise()
  return file_range.Body
}

Upload File Part

The uploadPart method is used to complete a part of a multi-part transfer. The method needs the UploadID created when initiating the multi-part transfer and the PartNumber for the chunk index. ETags for the uploaded content will be returned.

1
2
3
4
const upload_part = async (Bucket, Key, UploadId, PartNumber, Body) => {
  const result = await s3.uploadPart({Bucket, Key, UploadId, PartNumber, Body}).promise()
  return result
}

Note: The uploadPart method does not support streaming Body values unless they come from the filesystem. This means the entire part has to be read into memory before uploading. The serverless function must have enough memory to handle this.

Local Script

The local script used to invoke the functions has to do the following things…

  • Create and complete the multi-part transfer
  • Calculate file part byte ranges for function input parameters
  • Copy file parts using concurrent functions invocations.

Create Multi-Part Transfers

The S3-API JS client can be used to create a new Multi-Part Transfer.

1
const { UploadId } = await s3.createMultipartUpload({Bucket: '...', Key: '...'}).promise()

The UploadId can then be used as an input parameter to the serverless function.

Create Byte Ranges

Source file sizes can be retrieved using the client library.

1
2
3
4
const file_size = async (Bucket, Key) => {
  const { ContentLength } = await s3.headObject({Bucket, Key}).promise()
  return ContentLength
}

This file size needs splitting into consecutive byte ranges of fixed size chunks. This function will return an array of the HTTP Range header values (bytes=N-M) needed.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const split_into_ranges = (bytes, range_mbs) => {
  const range_size = range_mbs * 1024 * 1024
  const ranges = []
  let range_offset = 0
  const last_byte_range = bytes - 1

  while(range_offset < last_byte_range) {
    const start = range_offset
    // Last byte range may be less than chunk size where file size
    // is not an exact multiple of the chunk size.
    const end = start + Math.min((range_size - 1), last_byte_range - start)
    ranges.push(`bytes=${start}-${end}`)
    range_offset += range_size
  }

  return ranges
}

Invoke Concurrent Functions

Serverless functions need to be invoked for each byte range calculated above. Depending on the file and chunk sizes used, the number of invocations needed could be larger than the platform’s concurrency rate limit (defaults to 1000 on IBM Cloud Functions). In the example above (120GB file in 100MB chunks), 1229 invocations would be needed.

Rather than executing all the byte ranges at once, the script needs to use a maximum of 1000 concurrent invocations. When initial invocations finish, additional functions can be invoked until all the byte ranges have been processed. This code snippet shows a solution to this issue (using IBM Cloud Functions JS SDK).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
const parallel = require('async-await-parallel');
const retry = require('async-retry');
const openwhisk = require('openwhisk');

const concurrent = 1000
const retries = 3
const chunk_size = 100

const static_params = {
  source_bucket, dest_bucket, source_filename, dest_filename, mpu
}

const ow = openwhisk({...});

const bucket_file_size = await file_size(source_bucket, source_filename);
const ranges = split_into_ranges(bucket_file_size, chunk_size);

const uploads = ranges.map((range, index) => {
  const invoke = async () => {
    const params = Object.assign({range, index: index + 1}, static_params)
    const upload_result = await ow.actions.invoke({
      name: '...', blocking: true, result: true, params
    })
    return upload_result
  }

  return async () => retry(invoke, retries)
})

const finished = await parallel(uploads, concurrent)

The uploads value is an array of lazily evaluated serverless function invocations. The code snippet uses the async-await-parallel library to limit the number of concurrent invocations. Handling intermittent or erroneous invocation errors is managed using the async-retry library. Failed invocations will be retried three times.

Finish Multi-Part Transfer

Once all parts have been uploaded, ETags (returned from the serverless invocations) and the Part Numbers are used to complete the multi-part transfer.

1
2
3
4
5
6
7
8
const parts = finished.map((part, idx) => {
  part.PartNumber = idx + 1
  return part
})

const { Location, Bucket, Key, ETag } = await s3.completeMultipartUpload({
  Bucket: '...', Key: '...', UploadId: '...', MultipartUpload: { Parts }
}).promise()

Results

The previous file transfer process (download locally and re-upload from development machine) was taking close to three hours. This was an average throughput rate of 1.33MB/s ((120GB * 2) / 180).

Using serverless functions, the entire process was completed in FOUR MINUTES. File chunks of 100MB were transferred in parallel using 1229 function invocations. This was an average throughput rate of 60MB/s. That was a reduction in total transfer time of ~98%. 💯💯💯

Serverless makes it incredibly easy to run embarrassingly parallel workloads in the cloud. With just a few lines of code, the file transfer process can be parallelised using 1000s of concurrent functions. The client was rather impressed as you can imagine… 😎

Comments