Streaming Reads with Python and Google Cloud Storage
In data processing, efficiency and reliability are paramount. As a data engineer, you’ll often need to read files in resource constrained environments. One common approach to reading a file is to stream the file and process it in smaller chunks. I recently came across a way to accomplish this using Google Cloud Storage (GCS), Python, and a CRC32C checksum (to verify the file’s integrity). Some reasons why this approach could be useful and why this post exists:
- Overcoming Documentation Gaps: While the Google Cloud Storage Python library documentation is good, it’s not explicit about reading files as stream-like objects. This gap in documentation can hinder developers from utilizing streaming.
- Immediate Processing of Large Files: For sizeable files requiring sequential processing, like decompressing data and transferring it to databases such as Postgres, the ability to start work immediately, without waiting for the entire file to download, is a time saver.
- Minimal Resource Usage: Streaming files directly from GCS ensures minimal memory and disk usage. This is ideal when working with resource-constrained environments or aiming to maintain a lean data processing pipeline.
Implementing the Approach
To capitalize on these benefits, I developed a Python snippet that seamlessly integrates with the Google Cloud Storage Python library. The Crc32cReader
class wraps around a file-like object, allowing you to calculate the CRC32C digest as the file is read in chunks. This method ensures data integrity and offers an efficient way to handle large files from GCS.
Here’s the main code implementation:
import base64
import struct
from contextlib import contextmanager
from typing import IO
from google.cloud import storage
class Crc32cReader:
"""The Google Python client doesn't calculate the CRC32C digest when streaming a file for consumption.
This wrapper provides a way to calculate the CRC32C digest as a file-like object is read in chunks.
"""
def __init__(self, fileobj: IO[bytes]) -> None:
self._fileobj: IO[bytes] = fileobj
self.digest: int = 0
def _update(self, chunk: bytes) -> None:
"""Given a chunk read from the file, update the hexdigest"""
self.digest = crc32c(chunk, self.digest)
def read(self, size: int = -1) -> bytes:
chunk: bytes = self._fileobj.read(size)
self._update(chunk)
return chunk
def read1(self, size: int = -1) -> bytes:
return self.read(size)
def hexdigest(self) -> str:
"""Return the hexdigest of the hasher.
The Base64 encoded CRC32C is in big-endian byte order.
See https://cloud.google.com/storage/docs/hashes-etags
"""
return base64.b64encode(struct.pack(">I", self.digest)).decode("utf-8")
@contextlib.contextmanager
def read_file_as_stream(self, bucket_name: str, file_path: str):
"""
Read a GCS blob as a stream.
Args:
bucket_name: The name of the bucket that contains the blob.
file_path: The name of the blob.
Returns:
A file-like object
"""
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name=file_path)
# You must reload a blob to get properties of it, like crc32c digest
blob.reload()
expected_checksum: int = blob.crc32c
calculated_checksum = 0
# Note: The Google libraries stream a file when open is invoked on the blob itself
with blob.open(mode="rb", chunk_size=CHUNK_SIZE) as blob_in:
crc32c_reader = Crc32cReader(blob_in)
yield crc32c_reader
calculated_checksum = crc32c_reader.hexdigest()
if calculated_checksum != expected_checksum:
raise ValueError(
f"Checksum mismatch. Expected {expected_checksum}, calculated {calculated_checksum}"
)
The read_file_as_stream
method can be used by a consuming method that needs to access a file from GCS in a streaming fashion. Once the file is processed, the CRC32C digest is verified against the metadata stored in GCS and an exception is raised if there’s a mis-match, due to transmission error or data corruption.
Conclusion
With streaming of files, we can create more efficient, reliable, and resource-conscious data processing workflows. This approach is particularly beneficial for handling large files, ensuring data integrity, and optimizing resource usage.