Overview

Ballista is a distributed compute platform primarily implemented in Rust, using Apache Arrow as the memory model. It is built on an architecture that allows other programming languages to be supported as first-class citizens without paying a penalty for serialization costs.

The foundational technologies in Ballista are:

Ballista can be deployed in Kubernetes, or as a standalone cluster using etcd for discovery.

Architecture

The following diagram highlights some of the integrations that will be possible with this unique architecture. Note that not all components shown here are available yet.

Ballista Architecture Diagram

How does this compare to Apache Spark?

Although Ballista is largely inspired by Apache Spark, there are some key differences.

  • The choice of Rust as the main execution language means that memory usage is deterministic and avoids the overhead of GC pauses.
  • Ballista is designed from the ground up to use columnar data, enabling a number of efficiencies such as vectorized processing (SIMD and GPU) and efficient compression. Although Spark does have some columnar support, it is still largely row-based today.
  • The combination of Rust and Arrow provides excellent memory efficiency and memory usage can be 5x - 10x lower than Apache Spark in some cases, which means that more processing can fit on a single node, reducing the overhead of distributed compute.
  • The use of Apache Arrow as the memory model and network protocol means that data can be exchanged between executors in any programming language with minimal serialization overhead.

Deployment

There are currently two main methods of deploying a Ballista cluster.

  1. Docker Compose
  2. Kubernetes

Installing Ballista with Docker Compose

Docker Compose is a convenient way to launch executors when testing locally. This example docker-compose.yaml demonstrates how to start a Ballista Rust executor and how to mount a data volume into the container.

version: '2.0'
services:
  etcd:
    image: quay.io/coreos/etcd:v3.4.9
    command: "etcd -advertise-client-urls http://etcd:2379 -listen-client-urls http://0.0.0.0:2379"
    ports:
      - "2379:2379"
  ballista-rust:
    image: ballistacompute/ballista-rust:0.3.0
    command: "/executor --mode etcd --etcd-urls etcd:2379 --external-host 0.0.0.0 --port 50051 --concurrent-tasks=2"
    ports:
      - "50051:50051"
    volumes:
      - /mnt:/mnt

With the above content saved to a docker-compose.yaml file, the following command can be used to start the single node cluster.

docker-compose up

Noet that the executor will not be able to execute queries until it succcesfully registers with etcd.

Installing Ballista with Kubernetes

You will need a Kubernetes cluster to deploy to. I recommend using Minikube for local testing, or Amazon's Elastic Kubernetes Service (EKS).

These instructions are for using Minikube on Ubuntu.

Create a Minikube cluster

Create a Minikube cluster using the docker driver.

minikube start --driver=docker --cpus=12

Permissions

Ballista will need permissions to list pods. We will apply the following yaml to create list-pods cluster role and bind it to the default service account in the current namespace.

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: list-pods
rules:
- apiGroups:
  - ""
  resources:
  - pods
  verbs: ["get", "watch", "list", "create", "edit", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: ballista-list-pods
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: list-pods
subjects:
- kind: ServiceAccount
  name: default
  namespace: default
kubectl apply -f rbac.yaml

You should see the following output:

clusterrole.rbac.authorization.k8s.io/list-pods created
clusterrolebinding.rbac.authorization.k8s.io/ballista-list-pods created

Mounting a volume

First, we need to mount the host data directory into the Minikube VM. This examples assumes that the local data directory is /mnt/ and that we are going to mount it to the same path in the pod.

minikube mount /mnt:/mnt

You should see output similar to this:

Mounting host path /mnt/ into VM as /mnt ...
  Mount type:   <no value>
  User ID:      docker
  Group ID:     docker
  Version:      9p2000.L
  Message Size: 262144
  Permissions:  755 (-rwxr-xr-x)
  Options:      map[]
  Bind Address: 172.17.0.1:43715
    Userspace file server: ufs starting
Successfully mounted /mnt/ to /mnt

Next, we will apply the following yaml to create a persistent volume and a persistent volume claim so that the specified host directory is available to the containers.

apiVersion: v1
kind: PersistentVolume
metadata:
  name: data-pv
  labels:
    type: local
spec:
  storageClassName: manual
  capacity:
    storage: 10Gi
  accessModes:
    - ReadWriteOnce
  hostPath:
    path: "/mnt"
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: data-pv-claim
spec:
  storageClassName: manual
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 3Gi

Create a persistent volume.

kubectl apply -f pv.yaml

You should see the following output:

persistentvolume/nyctaxi-pv created
persistentvolumeclaim/nyctaxi-pv-claim created

Creating the Ballista cluster

We will apply the following yaml to create a service and a stateful set of twelve Rust executors. Note that can you simply change the docker image name from ballistacompute/ballista-rust to ballistacompute/ballista-jvm or ballistacompute/ballista-spark to use the JVM or Spark executor instead.

This definition will create six executors, using 1-2GB each. If you are running on a computer with limited memory available then you may want to reduce the number of replicas.

apiVersion: v1
kind: Service
metadata:
  name: ballista
  labels:
    app: ballista
spec:
  ports:
    - port: 50051
      name: flight
  clusterIP: None
  selector:
    app: ballista
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: ballista
spec:
  serviceName: "ballista"
  replicas: 6
  selector:
    matchLabels:
      app: ballista
  template:
    metadata:
      labels:
        app: ballista
        ballista-cluster: ballista
    spec:
      containers:
      - name: ballista
        image: ballistacompute/ballista-rust:0.3.0
        command: ["/executor"]
        args: ["--mode=k8s", "--external-host=0.0.0.0", "--port=50051", "--concurrent-tasks=2"]
        resources:
          requests:
            cpu: "1"
            memory: "1024Mi"
          limits:
            cpu: "2"
            memory: "2048Mi"
        ports:
          - containerPort: 50051
            name: flight
        volumeMounts:
          - mountPath: /mnt
            name: data
      volumes:
      - name: data
        persistentVolumeClaim:
          claimName: data-pv-claim

Run the following kubectl command to deploy the Ballista cluster.

kubectl apply -f ballista-cluster.yaml

You should see the following output:

service/ballista created
statefulset.apps/ballista created

Run the kubectl get pods command to confirm that the pods are running. It will take a few seconds for all of the pods to start.

kubectl get pods
NAME          READY   STATUS    RESTARTS   AGE
ballista-0    1/1     Running   0          37s
ballista-1    1/1     Running   0          33s
ballista-10   1/1     Running   0          16s
ballista-11   1/1     Running   0          15s
ballista-2    1/1     Running   0          32s
ballista-3    1/1     Running   0          30s
ballista-4    1/1     Running   0          28s
ballista-5    1/1     Running   0          27s
ballista-6    1/1     Running   0          24s
ballista-7    1/1     Running   0          22s
ballista-8    1/1     Running   0          20s
ballista-9    1/1     Running   0          18s

Port Forwarding

Run the following command to expose the service so that clients can submit queries to the cluster.

kubectl port-forward service/ballista 50051:50051

You should see the following output:

Forwarding from 127.0.0.1:50051 -> 50051
Forwarding from [::1]:50051 -> 50051

Deleting the Ballista cluster

Run the following kubectl command to delete the cluster.

kubectl delete -f ballista-cluster.yaml

Executing queries from Rust

For this example we will be querying a 600 MB CSV file from the NYC Taxi and Limousine Commission public data set, which can be downloaded here.

Create a new Rust binary application using cargo init --bin nyctaxi then add the following dependency section to the Cargo.toml file.

[dependencies]
ballista = "0.3.0"
tokio = { version = "0.2", features = ["full"] }

Replace the src/main.rs with the following code

use std::collections::HashMap;

extern crate ballista;
use ballista::arrow::util::pretty;
use ballista::dataframe::{min, max, Context};
use ballista::datafusion::logicalplan::*;
use ballista::error::Result;

#[tokio::main]
async fn main() -> Result<()> {
    let ctx = Context::remote("localhost", 50051, HashMap::new());

    let results = ctx
        .read_csv(
            "yellow_tripdata_2019-12.csv", 
            CsvReadOptions::new().has_header(true))?
        .aggregate(
            vec![col("passenger_count")], 
            vec![min(col("fare_amount")), max(col("fare_amount"))])?
        .collect()
        .await?;

    pretty::print_batches(&results)?;

    Ok(())
}

Execute the code using cargo run and it should produce the following output.

+-----------------+--------------------------------
| passenger_count | MIN(fare_amt) | MAX(fare_amt) |
+-----------------+--------------------------------
| 9               | -90           | 98            |
| 0               | -78           | 900           |
| 3               | -280          | 499           |
| 4               | -250          | 700           |
| 7               | 0.77          | 78            |
| 8               | 8             | 88            |
| 2               | -1472         | 1472          |
| 6               | -65           | 544.5         |
| 1               | -600          | 398468.38     |
| 5               | -52           | 442           |
+-----------------+---------------+---------------+

Reference

This page documents which operators and expressions are currently supported by the various executor implementations.

Operators

FeatureRustJVMSpark
ProjectionYesYesYes
SelectionYesYesYes
AggregateYesYesYes
SortNoNoNo
LimitNoNoNo
OffsetNoNoNo
JoinNoNoNo

Expressions

ExpressionRustJVMSpark
Column referenceYesYesYes
Literal values (double, long, string)YesYesYes
Arithmetic Expressions (+, -, *, /)YesYesYes
Simple aggregates (min, max, sum, avg, count)YesYesYes
Comparison Operators (=, !=, <, <=, >, >=)YesYesYes
CAST(expr AS type)NoYesNo
Boolean operators (AND, OR, NOT)NoYesYes

Frequently Asked Questions

What is the relationship between DataFusion and Ballista?

DataFusion is a library for executing queries in-process using the Apache Arrow memory model and computational kernels. It is designed to run within a single process, using threads for parallel query execution.

Ballista is a distributed compute platform design to leverage DataFusion and other query execution libraries.