Streaming Data Processing with PySpark Streaming

Streaming data processing has existed in our computing lexicon for at least 50 years. The ideas Doug McIlroy presented in 1964 regarding what would become UNIX pipes have been revisited, reimagined and reengineered countless times. As of this writing the Apache Software Foundation has SamzaSpark and Stormfor processing streaming data… and those are just the projects beginning with S! Since we use Spark and Python at Endgame I was excited to try out the newly released PySpark Streaming API when it was announced for Apache Spark 1.2. I recently gave a talk on this at the Washington DC Area Apache Spark Interactive Meetup. The slides for the talk are available here. What follows in this blog post is an in depth look at some PySpark functionality that some early adopters might be interested in playing with.

 

USING UPDATESTATEBYKEY IN PYSPARK STREAMING

In the meetup slides, I present a rather convoluted method for calculating CPU Percentage use from the Docker stats API using PySpark Streaming.updateStateByKey is a better way to calculate such information on a stream, but the python documentation was a bit lacking. Also the lack of type signatures can make PySpark programming a bit frustrating. To make sure my code worked, I took a cue from one of the attendees (thanks Jon) and did some test driven development. TDD works so well I would highly suggest it for your PySpark transforms, since you don’t have a type system protecting you from returning a tuple when you should be returning a list of tuples.

Let’s dig in. Here is the unit test for updateStateByKey fromhttps://github.com/apache/spark/blob/master/python/pyspark/streaming/tests.py#L344-L359:

def test_update_state_by_key(self):

    def updater(vs, s):
        if not s:
            s = []
        s.extend(vs)
        return s

    input = [[('k', i)] for i in range(5)]

    def func(dstream):
        return dstream.updateStateByKey(updater)

    expected = [[0], [0, 1], [0, 1, 2], [0, 1, 2, 3], [0, 1, 2, 3, 4]]
    expected = [[('k', v)] for v in expected]
    self._test_func(input, func, expected)

This test code tells us, if we play around a bit that for the input:

[[('k', 0)], [('k', 1)], [('k', 2)], [('k', 3)], [('k', 4)]]

we expect the output:

[[('k', [0])],
 [('k', [0, 1])],
 [('k', [0, 1, 2])],
 [('k', [0, 1, 2, 3])],
 [('k', [0, 1, 2, 3, 4])]]

 

UpdateStateByKey allows you maintain a state by key. This test is fine, but if you ran it in production you’d end up with an out of memory error as 's' will extend without bounds. In a unit test with a fixed input it’s fine, though. For my presentation, I wanted to pull out the time in nanoseconds that a given container had used the CPUs of my machine and divide it by the time in nanoseconds that the system CPU had used. For those of you thinking back to calculus, I want to do a derivative on a stream.

How do I do that and keep it continuous? Well one idea is to keep a limited amount of these delta x’s and delta y’s around and then calculate it. In the presentation slides, you’ll see that’s what I did by creating multiple DStreams, joining them, doing differences in lambda functions. It was overly complicated, but it worked.

In this blog I want to present a different idea that I cooked up after the meetup. First the code:

from itertools import chain, tee, izip
def test_complex_state_by_key(self):

    def pairwise(iterable):
        "s -> (s0,s1), (s1,s2), (s2, s3), ..."
        a, b = tee(iterable)
        next(b, None)
        return izip(a, b)

    def derivative(s,x,y):
        "({'x':2,'y':1},{'x':6,'y':2}) -> derivative(_,'x','y') -> float(1)/4 -> 0.25"
        return float(s[1][y] - s[0][y])/(s[1][x]-s[0][x])

    def updater(vs, s): # vs is the input stream, s is the state
        if s and s.has_key('lv'):
            _input = [s['lv']] + vs
        else:
            _input = vs
        d = [derivative(p,'x','y') for p in pairwise(_input)]
        if s and s.has_key('d'):
            d = s['d'] + d
        last_value = vs[-1]
        if len(d) > len(_input):
            d = d[-len(_input)] # trim to length of _input
        state = {'d':d,'lv':last_value}
        return state

    input = [[('k',{'x':2,'y':1})],[('k',{'x':3,'y':2})],[('k',{'x':5,'y':3})]]

    def func(dstream):
        return dstream.updateStateByKey(updater)

    expected = [[('k', {'d': [], 'lv': {'x': 2, 'y': 1}})],
                [('k', {'d': [1.0], 'lv': {'x': 3, 'y': 2}})],
                [('k', {'d': [1.0, 0.5], 'lv': {'x': 5, 'y': 3}})]]
    self._test_func(input, func, expected)

Here’s an explanation of what I’m trying to do here. I pulled in the pairwisefunction from the itertools recipe page. Then I crafted a very specificderivative method that takes a dictionary, and two key names and returns the slope of the line. Rise over run You can plug this code into the pyspark streaming tests and it passes. It can be used as an unoptimized recipe for keeping a continuous stream of derivatives, although I can imagine a few nice changes for usability/speed. The state keeps d which is the differences between pairs of the input, and lv which is the last value of the data stream. That should allow this to work on a continuous stream of values. Integrating this into the demo I did in the presentation is left as an exercise for the reader. ;)

Comments, questions, code review welcome at @rseymour. If you find these sorts of problems and their applications to the diverse world of cyber security interesting, you might like to work with the data science team here at Endgame.