Streaming, Serialization, and IPC

Writing and Reading Streams

Arrow defines two types of binary formats for serializing record batches:

  • Streaming format: for sending an arbitrary length sequence of record batches. The format must be processed from start to end, and does not support random access

  • File or Random Access format: for serializing a fixed number of record batches. Supports random access, and thus is very useful when used with memory maps

To follow this section, make sure to first read the section on Memory and IO.

Using streams

First, let’s create a small record batch:

In [1]: import pyarrow as pa

In [2]: data = [
   ...:     pa.array([1, 2, 3, 4]),
   ...:     pa.array(['foo', 'bar', 'baz', None]),
   ...:     pa.array([True, None, False, True])
   ...: ]
   ...: 

In [3]: batch = pa.record_batch(data, names=['f0', 'f1', 'f2'])

In [4]: batch.num_rows
Out[4]: 4

In [5]: batch.num_columns
Out[5]: 3

Now, we can begin writing a stream containing some number of these batches. For this we use RecordBatchStreamWriter, which can write to a writeable NativeFile object or a writeable Python object. For convenience, this one can be created with new_stream():

In [6]: sink = pa.BufferOutputStream()

In [7]: writer = pa.ipc.new_stream(sink, batch.schema)

Here we used an in-memory Arrow buffer stream, but this could have been a socket or some other IO sink.

When creating the StreamWriter, we pass the schema, since the schema (column names and types) must be the same for all of the batches sent in this particular stream. Now we can do:

In [8]: for i in range(5):
   ...:    writer.write_batch(batch)
   ...: 

In [9]: writer.close()

In [10]: buf = sink.getvalue()

In [11]: buf.size
Out[11]: 1984

Now buf contains the complete stream as an in-memory byte buffer. We can read such a stream with RecordBatchStreamReader or the convenience function pyarrow.ipc.open_stream:

In [12]: reader = pa.ipc.open_stream(buf)

In [13]: reader.schema
Out[13]: 
f0: int64
f1: string
f2: bool

In [14]: batches = [b for b in reader]

In [15]: len(batches)
Out[15]: 5

We can check the returned batches are the same as the original input:

In [16]: batches[0].equals(batch)
Out[16]: True

An important point is that if the input source supports zero-copy reads (e.g. like a memory map, or pyarrow.BufferReader), then the returned batches are also zero-copy and do not allocate any new memory on read.

Writing and Reading Random Access Files

The RecordBatchFileWriter has the same API as RecordBatchStreamWriter. You can create one with new_file():

In [17]: sink = pa.BufferOutputStream()

In [18]: writer = pa.ipc.new_file(sink, batch.schema)

In [19]: for i in range(10):
   ....:    writer.write_batch(batch)
   ....: 

In [20]: writer.close()

In [21]: buf = sink.getvalue()

In [22]: buf.size
Out[22]: 4226

The difference between RecordBatchFileReader and RecordBatchStreamReader is that the input source must have a seek method for random access. The stream reader only requires read operations. We can also use the open_file() method to open a file:

In [23]: reader = pa.ipc.open_file(buf)

Because we have access to the entire payload, we know the number of record batches in the file, and can read any at random:

In [24]: reader.num_record_batches
Out[24]: 10

In [25]: b = reader.get_batch(3)

In [26]: b.equals(batch)
Out[26]: True

Reading from Stream and File Format for pandas

The stream and file reader classes have a special read_pandas method to simplify reading multiple record batches and converting them to a single DataFrame output:

In [27]: df = pa.ipc.open_file(buf).read_pandas()

In [28]: df[:5]
Out[28]: 
   f0    f1     f2
0   1   foo   True
1   2   bar   None
2   3   baz  False
3   4  None   True
4   1   foo   True

Arbitrary Object Serialization

In pyarrow we are able to serialize and deserialize many kinds of Python objects. While not a complete replacement for the pickle module, these functions can be significantly faster, particular when dealing with collections of NumPy arrays.

Warning

While the functions in this section utilize the Arrow stream protocol internally, they do not produce data that is compatible with the above ipc.open_file and ipc.open_stream functions.

As an example, consider a dictionary containing NumPy arrays:

In [29]: import numpy as np

In [30]: data = {
   ....:     i: np.random.randn(500, 500)
   ....:     for i in range(100)
   ....: }
   ....: 

We use the pyarrow.serialize function to convert this data to a byte buffer:

In [31]: buf = pa.serialize(data).to_buffer()

In [32]: type(buf)
Out[32]: pyarrow.lib.Buffer

In [33]: buf.size
Out[33]: 200028928

pyarrow.serialize creates an intermediate object which can be converted to a buffer (the to_buffer method) or written directly to an output stream.

pyarrow.deserialize converts a buffer-like object back to the original Python object:

In [34]: restored_data = pa.deserialize(buf)

In [35]: restored_data[0]
Out[35]: 
array([[-0.2732196 , -0.96730411, -1.13444673, ...,  0.26441927,
        -1.00286731,  0.42657124],
       [-1.14558367,  1.91042112, -0.01789659, ..., -0.34401617,
         0.16879513,  0.71165074],
       [ 0.00191341, -0.22191242,  0.9230292 , ..., -0.09576649,
        -1.46427909, -1.54222813],
       ...,
       [ 1.52417771, -0.90615156, -0.55178046, ..., -0.42481714,
        -0.32656278,  0.08375228],
       [-1.32693958, -0.04843527,  0.96283578, ..., -1.49964164,
        -0.48840049,  1.23726467],
       [ 0.97790138, -0.33759338,  0.39595975, ...,  1.27943566,
         0.42137313, -0.1656567 ]])

When dealing with NumPy arrays, pyarrow.deserialize can be significantly faster than pickle because the resulting arrays are zero-copy references into the input buffer. The larger the arrays, the larger the performance savings.

Consider this example, we have for pyarrow.deserialize

In [36]: %timeit restored_data = pa.deserialize(buf)
4.03 ms +- 185 us per loop (mean +- std. dev. of 7 runs, 100 loops each)

And for pickle:

In [37]: import pickle

In [38]: pickled = pickle.dumps(data)

In [39]: %timeit unpickled_data = pickle.loads(pickled)
54.8 ms +- 5.04 ms per loop (mean +- std. dev. of 7 runs, 10 loops each)

We aspire to make these functions a high-speed alternative to pickle for transient serialization in Python big data applications.

Serializing Custom Data Types

If an unrecognized data type is encountered when serializing an object, pyarrow will fall back on using pickle for converting that type to a byte string. There may be a more efficient way, though.

Consider a class with two members, one of which is a NumPy array:

class MyData:
    def __init__(self, name, data):
        self.name = name
        self.data = data

We write functions to convert this to and from a dictionary with simpler types:

def _serialize_MyData(val):
    return {'name': val.name, 'data': val.data}

def _deserialize_MyData(data):
    return MyData(data['name'], data['data']

then, we must register these functions in a SerializationContext so that MyData can be recognized:

context = pa.SerializationContext()
context.register_type(MyData, 'MyData',
                      custom_serializer=_serialize_MyData,
                      custom_deserializer=_deserialize_MyData)

Lastly, we use this context as an additional argument to pyarrow.serialize:

buf = pa.serialize(val, context=context).to_buffer()
restored_val = pa.deserialize(buf, context=context)

The SerializationContext also has convenience methods serialize and deserialize, so these are equivalent statements:

buf = context.serialize(val).to_buffer()
restored_val = context.deserialize(buf)

Component-based Serialization

For serializing Python objects containing some number of NumPy arrays, Arrow buffers, or other data types, it may be desirable to transport their serialized representation without having to produce an intermediate copy using the to_buffer method. To motivate this, suppose we have a list of NumPy arrays:

In [40]: import numpy as np

In [41]: data = [np.random.randn(10, 10) for i in range(5)]

The call pa.serialize(data) does not copy the memory inside each of these NumPy arrays. This serialized representation can be then decomposed into a dictionary containing a sequence of pyarrow.Buffer objects containing metadata for each array and references to the memory inside the arrays. To do this, use the to_components method:

In [42]: serialized = pa.serialize(data)

In [43]: components = serialized.to_components()

The particular details of the output of to_components are not too important. The objects in the 'data' field are pyarrow.Buffer objects, which are zero-copy convertible to Python memoryview objects:

In [44]: memoryview(components['data'][0])
Out[44]: <memory at 0x7f3cf337cae0>

A memoryview can be converted back to a Arrow Buffer with pyarrow.py_buffer:

In [45]: mv = memoryview(components['data'][0])

In [46]: buf = pa.py_buffer(mv)

An object can be reconstructed from its component-based representation using deserialize_components:

In [47]: restored_data = pa.deserialize_components(components)

In [48]: restored_data[0]
Out[48]: 
array([[-1.45777590e+00,  1.54582516e+00, -1.52039464e+00,
        -1.38348489e+00,  2.20253595e+00, -3.13153773e-01,
         1.29007173e+00,  9.81478510e-01, -1.55678358e-01,
         2.46125493e-03],
       [ 8.53849695e-03,  1.36933867e+00, -1.31371094e+00,
         1.08633787e+00, -4.41987877e-01, -3.32265394e-01,
        -3.90818926e-01,  1.16999365e+00, -4.49043178e-01,
        -8.64589131e-02],
       [ 2.10320045e-01,  8.78942224e-01, -8.17585427e-01,
        -2.55650942e+00, -1.22161972e-01,  1.65020989e-01,
        -8.11051589e-01,  1.64823540e-01,  3.61397675e-01,
        -8.31898951e-02],
       [ 9.65060786e-01,  3.46918341e-02,  1.13556865e+00,
        -2.77346036e-01,  2.13278698e+00, -1.99053614e+00,
         1.48798847e-01, -8.61891592e-01, -1.27085114e+00,
        -4.92234978e-01],
       [-6.27103666e-01, -5.87633655e-02,  1.04846407e+00,
         5.62871836e-01, -5.82192495e-01, -1.43817179e+00,
         3.25120064e-03, -2.17968621e+00,  1.46898637e-01,
         4.24617321e-02],
       [ 5.39916553e-01,  6.06250454e-01, -1.12709927e+00,
        -4.71029784e-01,  3.25697927e-01,  6.93231318e-01,
         2.73634539e+00,  6.59453764e-01,  7.17496045e-01,
         3.92111666e-01],
       [ 6.97050401e-01,  2.98092597e-01,  1.18045244e+00,
         4.29070019e-01, -7.69178415e-02,  2.17147417e-01,
        -1.04413298e+00,  1.24718042e+00, -2.75983454e+00,
         1.23987305e+00],
       [-7.43305388e-01, -1.54000093e+00,  9.87609959e-01,
        -1.49830513e+00, -2.10544696e-01, -7.98734836e-01,
         5.39735715e-01,  4.81497036e-01,  9.03444181e-01,
         2.04046217e-01],
       [-2.16859938e+00,  8.12337726e-01,  8.79783955e-01,
         3.28718307e-01,  1.40179887e+00, -1.37398079e+00,
        -1.61113991e-02,  5.70380798e-01,  5.46040359e-01,
         5.24690858e-01],
       [ 1.76658896e-01, -6.94678842e-01,  3.44672597e-01,
        -1.25153532e+00, -8.11050590e-01,  1.12049880e+00,
        -4.54767144e-01,  7.92050863e-01,  6.88263131e-01,
         3.88395275e-01]])

deserialize_components is also available as a method on SerializationContext objects.

Serializing pandas Objects

The default serialization context has optimized handling of pandas objects like DataFrame and Series. Combined with component-based serialization above, this enables zero-copy transport of pandas DataFrame objects not containing any Python objects:

In [49]: import pandas as pd

In [50]: df = pd.DataFrame({'a': [1, 2, 3, 4, 5]})

In [51]: context = pa.default_serialization_context()

In [52]: serialized_df = context.serialize(df)

In [53]: df_components = serialized_df.to_components()

In [54]: original_df = context.deserialize_components(df_components)

In [55]: original_df
Out[55]: 
   a
0  1
1  2
2  3
3  4
4  5