Leveraging PyOkera’s Usage of Client-Side Concurrency

PyOkera is our Python client library that provides customers with the ability to integrate their LOB applications directly into an Okera cluster.  The library provides some configuration settings that dictate how work should be parallelized. Tweaking these settings will enable the client application to optimize both the compute power of the Okera cluster along with compute power of the client machine to maximize concurrency.  This can result in substantially better runtimes for a complex query.

Before describing these configuration settings, it is important to understand the high-level architecture of an Okera cluster.  For the purposes of this article, we can simplify an Okera cluster to consist of multiple machines with two node specializations:

  • Planner Node: A machine that receives a complex query (e.g. SELECT * FROM FOO WHERE ...), analyzes and splits the work for the query into a set of task operations that can be executed in parallel. This is the brain of the system.
  • Worker Node: A machine that executes a Task and returns the results to the caller. This is the workhorse of the system.

A deployment typically consists of one or more Planner nodes and a number of Worker nodes.  A typical customer use case involves executing a complex SQL query across multiple worker nodes in an Okera cluster to ensure a reasonable SLA on the wall-clock runtime.  In an enterprise environment, this means the customer will likely prefer the ability to execute large volumes of different users’ queries concurrently within a predictable period of time for all users but at the cost of less optimal scheduling for any individual query.

There are some scenarios where a client simply needs a faster turnaround time for a query.  For example, if a query is backing a web report, the web UI will require the query to be completed within a couple of seconds (anything longer makes the UI feel sluggish).  For this type of scenario, the client needs a way to execute the query while leveraging the maximum amount of cluster resources possible. In these cases the client will monopolize cluster resources for a brief period of time so that a query which would typically take a single worker 30s to process could be reduced to 1s.

Various degrees of the above can be achieved with PyOkera by tweaking some input parameters for the plan and scan operations. To control the atomicity of the work being performed, the plan() function accepts a max_task_count value (which can also be passed via the scan methods).  When this is unbounded, the plan() operation can return hundreds or thousands of individual tasks to perform for a given query. Each of these tasks gets enqueued, executed, aggregated and returned to the client during the scan operation and some of this work is sequential. The individual execution of these tasks can be parallelized by PyOkera’s scan functions by specifying the max_client_process_count.  The max_client_process_count is the number of client-side worker processes to fork when executing a scan operation.  Each worker process will grab tasks from the local work queue and execute them concurrently on an available worker in the cluster.  As a result, the client has the ability to leverage a substantial number of worker nodes concurrently. For example, a query that normally takes 60s to run sequentially can be reduced to just 3s on a 20 node cluster by simply passing max_task_count=60, max_client_process_count=20 to the scan methods.  However, it’s important to recognize that this type of improvement is heavily dependent upon the shape of the underlying data being read and that the scan methods already default the max_client_process_count value to be 2 times the number of CPU cores on the client machine (so there is a default level of concurrency configured).

Below is a high-level diagram that describes the planning and execution of a query via PyOkera.

To benchmark the effectiveness of this parallelization, I created a basic Python script that measures the amount of time it takes to perform a scan operation with different values for max_client_process_count. In the graph below is actual data of a scan operation for a 1.5GB dataset stored in S3.

With this particular query we can see that setting the max_client_process_count to anything over 30 produces diminishing returns. This likely represents bandwidth limitations on the client machine. In a production environment, the optimal max_client_process_count would probably be between 8 and 15.

If you’d like to benchmark your own query with PyOkera, below is the script I used to generate the graph data and if you have any questions, feel free to contact me.

 from Okera import context

if sys.version_info[0] >= 3:

    # Python 3 specific imports

    from time import perf_counter


    # Python 2 specific imports and defs

    from time import time as perf_counter

def get_conn(host_address, host_port)

    ctx = context()


    return ctx.connect(host=host_address, port=host_port)

def measure_scan(conn, sql, process_count):

    start_time = perf_counter()

    conn.scan_as_json(sql, max_client_process_count=process_count)

    duration = perf_counter() - start_time

    print('{0}, {1}'.format(worker_count, duration))

conn = get_conn("PLANNER_HOSTNAME", 12050)

process_count = 1

while process_count <= 64:

    measure_scan(conn, 'SELECT * FROM FOO', process_count)

    process_count = process_count * 2