Tutorial 4: Distributed Machine Learning

How to use this tutorial

  • This tutorial is also available in Jupyter notebook format. To access and run the Jupyter notebook version of the tutorial, please sign up for free developer access by following instructions at https://github.com/juliustechco/juliusgraph.
  • Additional resources (video demos & blogs) are available at http://juliustech.co.
  • To report bugs or request new features, please raise an issue here. To schedule a live demo, please go to http://juliustech.co. Please check out this FAQ page or email us at info@juliustech.co for other general inquiries.
  • This tutorial is copyrighted by Julius Technologies, its use is governed by the terms of use.

In this tutorial, we use the Julius GraphEngine and its Domain Specific Language (RuleDSL) to build a distributed Machine Learning (ML) pipeline that can process a large volume of data in parallel for model training and inference.

Introduction

In the real world, the ML training or inference data can be too large to fit into the memory of a single computer. It is a common challenge that ML engineers often face when productionizing an ML model. There is a popular blog post, Training models when data doesn't fit in memory, describing how to use Dask (a popular python distribution package) to build distributed ML pipelines that can process large training and inference data in batches.

In this tutorial, we are going to replicate the functionality described in the blog post using the Julius GraphEngine instead of using Dask or Dagger.jl (which is a Julia package inspired by Dask). We will show how to achieve better results versus the original Dask blog with considerably fewer lines of code. Here, we will re-use the generic MapReduce pipeline developed in a previous tutorial. At the end of this tutorial, we will compare Julius with Dask, and summarize the key differences.

We use the same fraud detection dataset as the Dask blog, which originated from Kaggle. The realization of fraud is indicated by the column isFraud, which is the outcome the ML model tries to predict.

1. Simple ML Pipeline

As a baseline solution, we first build a simple ML pipeline in Julius without batching or distribution. We will then describe step by step how to adapt it to support batching and distribution. This is practical as it matches a typical ML model development workflow, where a data scientist first builds an ML model using a simple data pipeline, and then a data engineer parallelizes the implementation to handle large volumes of data. To productionize an ML model, it often takes multiple iterations between data scientists and data engineers which represents one of the most time-consuming and costly steps of a ML model's life cycle. By repeating the exact process in this tutorial, we illustrate how easy it is to move an ML pipeline from development to production using Julius.

In order to match the numbers in the original Dask blog, we chose to use the same Python ML models in sklearn. Julius can interop and integrate with any major programming languages such as Python, C/C++, Java, R, .Net, Julia, etc.

The fraud detection ML pipeline, as described in the Dask blog, consists of the following steps:

  • read datasets,
  • separate the features from the target variable from the data,
  • train a ML model (a ExtraTreesClassifier model),
  • infer using test data,
  • compute the AUC score.

We now proceed to the implementation. First, we import the required Julia and Julius packages:

using GraphEngine: RuleDSL, GraphVM
using DataFrames, DataScience, StatsBase
using AtomExt

The following are the few rules needed to define the entire ML pipeline, which are self-explanatory and roughly follow the steps mentioned above.

RuleDSL.@addrules ml begin

    # selects columns `cols` from a DataFrame
    select(
        ref::RuleDSL.NodeRef, cols::Any;
        label="$(isa(cols, InvertedIndex) ? "col != $(cols.skip)" : "col == $(cols)")"
    ) = begin
        DataScience.ApplyFn[x::DataFrame -> DataFrames.select(x, cols; copycols=false)](ref...)
    end

    # any `sklearn` ML model can be easily handled by overloading the `classifiertrain`
    # rule, as follows
    classifiertrain(
        model::Val{:ExtraTreesClassifier},
        options::Dict,
        trainxs::RuleDSL.NodeRef,
        trainy::RuleDSL.NodeRef;
        label="$model train"
    ) = begin
        DataScience.PyTrain["sklearn.ensemble.ExtraTreesClassifier", options](trainxs..., trainy...)
    end

    # this rule makes the predictions
    classify(
        train_data::RuleDSL.NodeRef, target::Symbol, model::Val, options::Dict, testx::RuleDSL.NodeRef;
        label="$model inference"
    ) = begin
        train_data_X = RuleDSL.@ref ml.select(train_data, Not(target))
        train_data_y = RuleDSL.@ref ml.select(train_data, target)
        trained = RuleDSL.@ref ml.classifiertrain(model, options,  train_data_X, train_data_y )
        DataScience.PyPredict(trained..., testx...)
    end

    # makes predictions and selects the `:proba` column for the resulting DataFrame
    classifyprob(
        train_data::RuleDSL.NodeRef,
        target::Symbol,
        model::Val,
        options::Dict,
        test_data::RuleDSL.NodeRef;
        label="prob"
    ) = begin
        testx = RuleDSL.@ref ml.select(test_data, Not(target))
        DataScience.ApplyFn[
            x::DataFrame -> DataFrames.select(x, :proba; copycols=false)
        ](classify(train_data, target, model, options, testx))
    end

    # computes the AUC score
    score(realized::RuleDSL.NodeRef, probs::RuleDSL.NodeRef) = begin
        DataScience.PyScore(realized..., probs...)
    end
 end

The PyTrain, PyPredict, PyScore are three useful atoms provided by the DataScience package that wraps up the training, inference and roc_auc_score method from the sklearn Python package. The PyTrain atom is generic, it can instantiate any Python ML model using its name and parameters as shown at the ml.classifiertrain rule. Using this set of rules, we can create a simple ML pipeline as:

# we use existing rules in ds namespace to read CSV files from a shared drive
train_data_file = joinpath(@__DIR__, "..", "data/train_fraud.csv")
test_data_file = joinpath(@__DIR__, "..", "data/test_fraud.csv")
train_data = RuleDSL.@ref ds.csvsrc(train_data_file, true; label="train data")
test_data  = RuleDSL.@ref ds.csvsrc(test_data_file, true; label="test data")

target = :isFraud
model = Val(:ExtraTreesClassifier)
options = Dict(:n_estimators => 10, :min_samples_leaf => 10)

pred = RuleDSL.@ref ml.classifyprob(train_data, target, model, options, test_data)
test_data_y = RuleDSL.@ref ml.select(test_data, target)
mlscore = RuleDSL.@ref ml.score(test_data_y, pred);
config = RuleDSL.Config()
gs1 = GraphVM.createlocalgraph(config, RuleDSL.GenericData())
GraphVM.calcfwd!(gs1, Set([mlscore]));

We have now created a simple ML training and inference pipeline without using any batching or distribution. Julius provides an easy-to-use web UI for users to navigate and visualize the resulting data and logic in the computation graph. The following code block starts a local server for the web UI so that we can retrieve the resulting data from the graph.

using GraphIO

# a container of graphs
gss = Dict{String,RuleDSL.AbstractGraphState}()

# used for WebUI display purposes
port = GraphVM.drawdataport()
@async GraphVM.startresponder(gss, port);

The Julius package GraphIO provides several convenience functions for retrieving and displaying graphs in SVG format. Users can also view the graph data interactively by clicking on the url below to bring up the full web UI.

svg = GraphIO.postlocalgraph(gss, gs1, port, true; key="single");
GraphIO.postsvg(svg, "ml_pipeline_1.svg")
view graph data at http://127.0.0.1:8080/ui/depgraph.html?dataurl=127.0.0.1:7138_single

Figure 1 - Simple ML Pipeline.

Then, the AUC score value obtained using the complete dataset is:

RuleDSL.getdata(gs1, mlscore, 1)
0.9878845834626067

1.1 Down Sampling

Downsampling is a common technique used to reduce the training data size so that the training can run faster. The Dask blog implemented a 5% downsampling while maintaining a constant fraction of real fraud. We replicate the same downsampling scheme using a single Julia function and a single rule in Julius:

using Random
using StatsBase

function downsample(ycol::Symbol, frac::Float64, df::DataFrame)
    # get filtered DataFrames with true/false cases for isFraud
    positive = DataFrames.filter(row -> isequal(row[ycol], true), df)
    negative = DataFrames.filter(row -> isequal(row[ycol], false), df)

    # sample with replacement each DataFrame
    dspositive = positive[sample(1:nrow(positive), round(Int, frac * nrow(positive)), replace=true), :]
    dsnegative = negative[sample(1:nrow(negative), round(Int, frac * nrow(negative)), replace=true), :]

    # concatenate both sampled DataFrames
    merged = vcat(dspositive, dsnegative)

    # shuffle rows before returning
    return merged[shuffle(1:nrow(merged)), :]
end

@addrules ml begin
    downsample(
        raw::RuleDSL.NodeRef, ycol::Symbol, frac::Float64
    ) = begin
        DataScience.ApplyFn[downsample, ycol, frac](raw...)
    end
end

Let's test the downsampling:

sampleratio = 0.05
downsamples = RuleDSL.@ref ml.downsample(train_data, target, sampleratio)
ml:downsample/ds:csvsrc/train data
gs2 = GraphVM.createlocalgraph(config, RuleDSL.GenericData())
GraphVM.calcfwd!(gs2, Set([downsamples]))

svg = GraphIO.postlocalgraph(gss, gs2, port; key="downsample");
GraphIO.postsvg(svg, "ml_pipeline_2.svg")
view graph data at http://127.0.0.1:8080/ui/depgraph.html?dataurl=127.0.0.1:7138_downsample

Figure 2 - Down Sample

We can verify that the fraud frequency remains unchanged, with the minor remaining difference due to rounding.

sample_df = RuleDSL.getdata(gs2, downsamples, 1)
sum(sample_df.isFraud) / size(sample_df, 1) * 100
0.12966091705630425
# use full data set to verify
using CSV
df = CSV.read(train_data_file, DataFrames.DataFrame)
sum(df.isFraud) / size(df, 1) * 100
0.12828849784581414

It is easy to modify the existing ML pipeline to include downsampling, we just replace the train_data with downsamples:

downproba = RuleDSL.@ref ml.classifyprob(downsamples, target, model, options, test_data)
downscore = RuleDSL.@ref ml.score(test_data_y, downproba)

gs3 = GraphVM.createlocalgraph(config, RuleDSL.GenericData())
GraphVM.calcfwd!(gs3, Set([downscore]))

svg = GraphIO.postlocalgraph(gss, gs3, port, true; key="downscore");
GraphIO.postsvg(svg, "ml_pipeline_3.svg")
view graph data at http://127.0.0.1:8080/ui/depgraph.html?dataurl=127.0.0.1:7138_downscore

Figure 3 - ML with Down Sample

Here, the AUC score obtained using downsampling is slightly less than that from using the full training data, as expected.

RuleDSL.getdata(gs3, downscore, 1)
0.9715159105488534

We have now built the baseline ML pipeline where the entire training and inference data is processed all at once. An obvious downside of this implementation is that it can't handle large training or inference data, if they do not fit into the computer's memory. Now let's proceed to productionize the pipeline by adding batching and distribution.

2. ML Pipeline with Batching

It is a common strategy to break the training data into multiple batches and train a separate ML model for each batch. Once we have multiple trained ML models, we can average their inferences for better accuracy. This strategy of boosting accuracy from multiple trained models is commonly called "bagging". Batching and bagging are often used together to allow large training data to be split across multiple machines to be processed in parallel.

2.1 Training Data Batching

We use a convenience type DDataFrame provided by Julius DataScience package to create a vector of RuleDSL.NodeRef that represents roughly equal-sized chunks from the large input CSV file.

train_ddf = DataScience.DDataFrame(train_data_file, blocksize="5 MB")
train_batches = train_ddf.chunks
down_batches = RuleDSL.@ref(ml.downsample(b, target, sampleratio) for b in train_batches)
8-element Vector{NodeRef}:
 ml:downsample/dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/train_fraud.csv
 ml:downsample/dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/train_fraud.csv
 ml:downsample/dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/train_fraud.csv
 ml:downsample/dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/train_fraud.csv
 ml:downsample/dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/train_fraud.csv
 ml:downsample/dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/train_fraud.csv
 ml:downsample/dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/train_fraud.csv
 ml:downsample/dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/train_fraud.csv

Training data batching and bagging can be easily implemented using the following single rule, which just re-uses the previous ml.classifyprob for each input batch and averages their output.

# compute the average of multiple dataframes
function dfmean(dfs::DataFrame...)
    df = reduce(.+, dfs)
    df ./ (length(dfs))
end

RuleDSL.@addrules ml begin
    bagpred(
        test::RuleDSL.NodeRef,
        model::Val,
        options::Dict,
        train_batches::Vector{RuleDSL.NodeRef},
        target::Symbol
    ) = begin
        refs = RuleDSL.@ref((ml.classifyprob(b, target, model, options, test) for b in train_batches))
        DataScience.ApplyFn[dfmean](refs...)
    end
end

bagpred = RuleDSL.@ref ml.bagpred(test_data, model, options, down_batches, target)
bagscore = RuleDSL.@ref ml.score(test_data_y, bagpred)

gs4 = GraphVM.createlocalgraph(config, RuleDSL.GenericData())
GraphVM.calcfwd!(gs4, Set([bagscore]))

svg = GraphIO.postlocalgraph(gss, gs4, port; key="ml");
GraphIO.postsvg(svg, "ml_pipeline_4.svg")
view graph data at http://127.0.0.1:8080/ui/depgraph.html?dataurl=127.0.0.1:7138_ml

Figure 4 - Batching & Bagging in Training Data

The AUC score obtained using multiple samples of the data is:

RuleDSL.getdata(gs4, bagscore, 1)
0.95354937716486

2.2 Batching both Training and Prediction Data

In the previous implementation, the training data is batched but not the inference data. In practice, the inference data could also be too large to fit into the memory of a single machine. In that case, we will also need to batch the inference data. In practice, it is a much more complex process to batch both training and inference data. But in Julius, we can leverage the generic MapReduce pattern to easily define this complicated pipeline with very little coding.

We first use DataScience.DDataFrame to create a vector of NodeRef.RuleDSL for the inference data.

test_ddf = DataScience.DDataFrame(test_data_file, blocksize="2.5 MB")
test_batches = test_ddf.chunks
4-element Vector{NodeRef}:
 dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/test_fraud.csv
 dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/test_fraud.csv
 dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/test_fraud.csv
 dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/test_fraud.csv

The doubly batched training and inference ML pipeline naturally maps to a MapReduce pipeline as the following:

  • mapper: compute the bagged inference of a single test batch from multiple trained models. This stage already includes training data batching.
  • shuffler/reducer: move the individual batch inference and concatenate them to form the entire inference.

The following batchpred rule extracts the realization and inference from the same test batch file in a DataFrame, and assigns a unique key using the hash of the batch file's NodeRef.

RuleDSL.@addrules ml begin
    # extract both realization and prob predictions
    batchpred(
        test::RuleDSL.NodeRef,
        model::Val,
        options::Dict,
        train_batches::Vector{RuleDSL.NodeRef},
        target::Symbol
    ) = begin
        DataScience.ApplyFn[
            (ind, prob)->[hash(test) => hcat(ind, prob)]
        ](select(test, target), bagpred(test, model, options, train_batches, target))
    end
end

# extracts the DataFrames from `batchpred` from all batches and concatenates them
function valcat(xs::Vector...)
    agg = DataFrame()
    for (_, v) in vcat(xs...)
        agg = vcat(agg, v)
    end
    return agg
end
valcat (generic function with 1 method)

The following is the entire definition of the doubly batched ML pipeline using the MapReduce pattern:

_mapper = RuleDSL.@ref ml.batchpred(model, options, down_batches, target)

# map all batches to 3 pipelines before reducing
_shuffler = RuleDSL.@ref mr.shuffler(first, 4)

# simply concatenates all the vectors in a given pipeline
_reducer = RuleDSL.@ref mr.reducer(vcat)

# valcat extracts the DataFrame from each batch, and concatenate them together
mrpred = RuleDSL.@ref mr.mapreduce(test_batches, _mapper, _shuffler, _reducer, valcat)
mr:mapreduce/NodeRef[4]

The last step in MapReduce is to aggregate the results from the mapper/shuffler results of the individual test batches. The generic mr.mapreduce rule can take an optional function as the last parameter to customize this aggregation. The function valcat is used for the aggregation, which concatenates individual batches' fraud realization and inference into a single DataFrame.

mrscore = RuleDSL.@ref ml.score(RuleDSL.@ref(ml.select(mrpred, :isFraud)), RuleDSL.@ref(ml.select(mrpred, :proba)))

gs5 = GraphVM.createlocalgraph(config, RuleDSL.GenericData())
GraphVM.calcfwd!(gs5, Set([mrscore]))

svg = GraphIO.postlocalgraph(gss, gs5, port; key="mapred");
GraphIO.postsvg(svg, "ml_pipeline_5.svg")
view graph data at http://127.0.0.1:8080/ui/depgraph.html?dataurl=127.0.0.1:7138_mapred

Figure 5 - Doubly Batching in Training and Inference

The corresponding AUC score from the doubly batched pipeline is similar:

RuleDSL.getdata(gs5, mrscore, 1)
0.9906256229475517

3. Distributed ML Pipeline

We have now adapted the simple ML pipeline to support doubly data batching in both the training and inference. However, the real benefits of batching come from distributing the data and computation to multiple computers, so that we can process large volumes of data and computation in parallel.

Using Julius, it is effortless to distribute the batched pipeline to multiple computers and run it in parallel. Let's use the doubly batched ML pipeline as an example to show how easy it is to distribute. We first connect to a remote cluster with 4 worker instances, and import the necessary packages on the remote cluster:

using GraphEngine: RuleDSL, GraphVM

config = RuleDSL.newconfig(RuleDSL.Config(), :project => "MapReduce")
balancer = GraphVM.GlobalUnique()
my_domain = GraphVM.mydomain()

# draw a port number to start the local cluster esrvice
remoteport = GraphVM.drawdataport()
7455
# start a local master service at the given port, which mimic the effects of a remote cluster

gs0 = GraphVM.RemoteGraphProxy(my_domain => 7225)
GraphVM.@rpc GraphVM.startlocalmasterservice(gs0, remoteport, 4)
gs = GraphVM.RemoteGraphProxy(config, my_domain => remoteport, balancer, GraphVM.GenericData())
GraphVM.wait4clusterinit(gs)
Dict{UInt64, Pair{Float64, GraphEngine.GraphVM.WorkerStatus}} with 4 entries:
  0x18fdae7dd515c15a => 1.65314e9=>Ready
  0x248c936e6c330a93 => 1.65314e9=>Ready
  0x0588c4f52e2b6c09 => 1.65314e9=>Ready
  0x8cf52965bd043568 => 1.65314e9=>Ready
GraphVM.@remote_eval gs begin
    using GraphEngine: RuleDSL, GraphVM
    using DataScience, Random, AtomExt, GraphIO
    using StatsBase, DataFrames
end

GraphVM.waitcheckstatus(gs, RuleDSL.getconfig(config, :project));

We now load the entire doubly batched ML pipeline to the remote cluster, by sending the code we have written so far to the remote cluster. The following is the full list of code to replicate the Dask blog in Julius. There are only 8 rules, and about 50 lines of code in total.

GraphVM.@remote_eval gs  begin
    function downsample(ycol::Symbol, frac::Float64, df::DataFrame)
        positive = DataFrames.filter(row -> isequal(row[ycol], true), df)
        negative =  DataFrames.filter(row -> isequal(row[ycol], false), df)
        dspositive = positive[sample(1:nrow(positive), round(Int, frac * nrow(positive)), replace=true), :]
        dsnegative = negative[sample(1:nrow(negative), round(Int, frac * nrow(negative)), replace=true), :]
        merged = vcat(dspositive, dsnegative)
        return merged[shuffle(1:nrow(merged)), :]
    end

    function dfmean(dfs::DataFrame...)
        df = reduce(.+, dfs)
        return df ./ (length(dfs))
    end

    function valcat(xs::Vector...)
        agg = DataFrame()
        for (_, v) in vcat(xs...)
            agg = vcat(agg, v)
        end
        return agg
    end
end

GraphVM.waitcheckstatus(gs, RuleDSL.getconfig(config, :project));

GraphVM.@addrules gs ml begin
    select(ref::RuleDSL.NodeRef, cols::Any; label="$(isa(cols, InvertedIndex) ? "col != $(cols.skip)" : "col == $(cols)")") = DataScience.ApplyFn[x::DataFrame->DataFrames.select(x, cols; copycols=false)](ref...)
    classifiertrain(model::Val{:ExtraTreesClassifier}, options::Dict, trainxs::RuleDSL.NodeRef, trainy::RuleDSL.NodeRef; label="$model train") = DataScience.PyTrain["sklearn.ensemble.ExtraTreesClassifier", options](trainxs..., trainy...)
    classify(train_data::RuleDSL.NodeRef, target::Symbol, model::Val, options::Dict, testx::RuleDSL.NodeRef; label="$model inference") = begin
        train_data_X = RuleDSL.@ref ml.select(train_data, Not(target))
        train_data_y = RuleDSL.@ref ml.select(train_data, target)
        trained = RuleDSL.@ref ml.classifiertrain(model, options,  train_data_X, train_data_y )
        DataScience.PyPredict(trained..., testx...)
    end
    classifyprob(train_data::RuleDSL.NodeRef, target::Symbol, model::Val, options::Dict, test_data::RuleDSL.NodeRef; label="prob") = begin
        testx = RuleDSL.@ref ml.select(test_data, Not(target))
        DataScience.ApplyFn[x::DataFrame->DataFrames.select(x, :proba; copycols=false)](classify(train_data, target, model, options, testx))
    end
    score(realized::RuleDSL.NodeRef, probs::RuleDSL.NodeRef)=DataScience.PyScore(realized..., probs...)
    downsample(raw::RuleDSL.NodeRef, ycol::Symbol, frac::Float64)=DataScience.ApplyFn[Main.downsample, ycol, frac](raw...)
    bagpred(test::RuleDSL.NodeRef, model::Val, options::Dict, train_batches::Vector{RuleDSL.NodeRef}, target::Symbol) = DataScience.ApplyFn[Main.dfmean](RuleDSL.@ref((ml.classifyprob(b, target, model, options, test) for b = train_batches))...)
    batchpred(test::RuleDSL.NodeRef, model::Val, options::Dict, train_batches::Vector{RuleDSL.NodeRef}, target::Symbol) = DataScience.ApplyFn[(ind, prob)->[hash(test) => hcat(ind, prob)]](select(test, target), bagpred(test, model, options, train_batches, target))
end

GraphVM.waitcheckstatus(gs, RuleDSL.getconfig(config, :project));

Afterwards, we can create and run the doubly batched pipeline on the cluster.

mrpred = RuleDSL.@ref mr.mapreduce(test_batches, _mapper, _shuffler, _reducer, Main.valcat)
mrscore = RuleDSL.@ref ml.score(RuleDSL.@ref(ml.select(mrpred, :isFraud)), RuleDSL.@ref(ml.select(mrpred, :proba)))

# select all the nodes with rule name classifiertrain, splitbykey and reducer in alljobs
keyjobs, ds = RuleDSL.jobdeps(config, [mrscore], [:classifiertrain, :splitbykey, :reducer]);
GraphVM.initgraph!(gs)

# distribute the nodes in alljobs to workers
GraphVM.dispatchjobs!(gs, keyjobs; nocopy=Set([:splitbykey]))
GraphVM.waitcheckstatus(gs, RuleDSL.getconfig(config, :project));

In the code block above, we first find the keyjobs, which is a vector of NodeRef that holds all the nodes in the graph that are associated with rule name classifertrain, splitbykey and reducer. These nodes are the main points of computation and data aggregation. A developer can use his domain knowledge about the specific workload in selecting the best set of rule names for the key jobs. The last key job is always the final node that the user wants to compute, which is mrscore in the cell above. Once these key jobs (or key nodes) are determined, we send them to the workers by calling GraphVM.dispatchjobs!. The optional nocopy parameter specifies rule names whose nodes shall not be copied over the network, it is meant for rules/nodes holding a large amount of data thus expensive for copying. If a node is designated as nocopy, the Julius Rule Engine automatically places its immediate dependencies on the same worker to avoid data copy. In the example above, the nodes splitbykey was flagged as nocopy to ensure the mapping results from a given input batch only exists on one worker, in order to conserve memory usage.

The way Julius distributes the jobs to workers is fundamentally different from Dask or Dagger.jl. In Dask or Dagger.jl, every node in their graphs represents a separate task that must be sent to a worker for execution individually. In Julius, since every worker holds the entire graph logic as defined by the RuleDSL, the dispatcher does not need to send every node to the worker, instead only a few key nodes are sent as a list of NodeRef objects. Once a worker receives a job definition in NodeRef, it can create any dependent nodes from the shared RuleDSL definitions. As a result, the Julius distribution requires much less communication between the dispatcher and the workers, thus incurring much less overhead.

The following cell shows that only 17 nodes need to be sent to the workers amongst all 156 nodes in the graph. The GraphVM.@rpc is a convenient macro that facilitates a remote procedure call via a GraphVM.RemoteGraphProxy object.

gstats = GraphVM.@rpc GraphIO.graphstats(gs)
println("length of keyjobs = ", length(keyjobs))
println("graph node cnt = ", gstats[:cnt])
length of keyjobs = 17
graph node cnt = 156

By creating dependent nodes at workers, Julius also achieves much better locality for graph execution as all the dependent nodes created by the worker are local. Therefore the Julius graph distribution and execution are much more efficient than Dask or Dagger.jl. Julius can easily distribute graphs as large as hundreds of millions of nodes, and even in these cases, the number of key nodes to distribute is rarely more than a few thousand. In comparison, Dask or Dagger.jl suffer from significant overhead when dealing with large graphs because of the need to distribute every single node. As a result, developers are advised to "avoid large graphs" when using Dask, while Julius does not suffer from such limitations.

svg = GraphIO.postremotegraph(gs, remoteport; maxnode=UInt(200));
GraphIO.postsvg(svg, "ml_pipeline_6.svg")

Figure 6 - Distributed ML Pipeline.

Data from the remote cluster is easily accessible using Julius' remote RPC interface.

GraphVM.@rpc GraphVM.onnode(gs, :getdata, UInt[0], hash(mrscore), 1)
0.9793635977178947

The resulting graph uses different colors to indicate the placement of nodes to individual worker instances, where a single color represents a given physical worker instance. There is a network data transfer for every arrow connecting two nodes of different colors. The entire graph distribution is handled by Julius automatically without the need for the developer to change a single line of code in RuleDSL or Atom definitions.

Upon close examination, we observe that the resulting MapReduce distribution is optimal in that the work load is evenly distributed amongst 4 workers, and there is no unnecessary data transfer between different physical computers (represented by different colors) in the resulting graph distribution.

Neither training nor inference input data were ever aggregated onto a single computer, so that we can stay within an individual worker's memory limit. Only the realization and inference outputs were aggregated to a single machine, which are only two columns of data; a tiny fraction compared to the entire inference input data set.

4. Streaming

Besides batching, streaming is another effective strategy to minimize memory usage. Instead of processing the entire data set at once, we could break it into multiple batches and process them sequentially in time. Streaming was not implemented in the original Dask blog, as the DAG created by Dask does not support streaming.

In contrast, any pipeline created in Julius can be easily run in streaming mode with a few lines of code. In the streaming mode, the RuleDSL.fwddata! method will be called multiple times with different streaming inputs. Here, we implement a few generic Atom types that act as the source, running average and cumulator, so that we can express the rich logic and behaviors in stream processing.

# records the streaming value x
RuleDSL.@datom Record begin
    values::Vector = []

    function fwddata!(x::Any)
        push!(values, x)
    end
end

# computes the running average of all the value being streamed
RuleDSL.@datom RunningAverage begin
    sum::DataFrame = DataFrame()
    cnt::Vector = [0]

    function fwddata!(x::DataFrame)
        if cnt[1] == 0
            append!(sum, x)
        else
            sum .+= x
        end

        cnt[1] += 1
        [ sum ./ cnt[1] ]
    end
end

# sequentially return values as represented in batchsrc, for each fwddata! call
# this only work for a source node, i.e., NodeRef without any further dependencies
RuleDSL.@datom StreamSrc begin
    config::RuleDSL.Config
    batchsrc::Vector{RuleDSL.NodeRef}
    idx::Vector = [0]

    function fwddata!()
        thissrc = batchsrc[idx[1] % length(batchsrc) + 1]
        atom = RuleDSL.calcop(config, thissrc)
        idx[1] += 1
        RuleDSL.fwddata!(atom)
    end
end
fwddata! (generic function with 93 methods)

We then add a few generic high level rules to connect these Atoms for streaming source, running average and cumulator:

RuleDSL.@addrules ml begin
    streamsrc(refs::Vector{RuleDSL.NodeRef}) = StreamSrc[RuleDSL.@config, refs]()
    runningaverage(ref::RuleDSL.NodeRef) = RunningAverage(ref...)
    record(ref::RuleDSL.NodeRef) = Record(ref...)
end

You might be curious how the ML performance improves as more training data is added in the bagging process. The following is a streaming pipeline that can answer this kind of question. The streaming ML pipeline receives individual training data: for each of which a new ML model is trained, then its inference is used to compute a running average of model inferences. The ML scores computed from the running average inference are then recorded, which shows how AUC score improves with more training data.

stream_ddf = DataScience.DDataFrame(train_data_file, blocksize="2 MB")
stream_batches = stream_ddf.chunks

stream_src = RuleDSL.@ref ml.streamsrc(stream_batches)
down_stream = RuleDSL.@ref ml.downsample(stream_src, target, sampleratio)

# change the mapper to use the streaming data source for training
_streammapper = RuleDSL.@ref ml.batchpred(model, options, [down_stream], target)

# the shuffler/reducer remains the same as before
_shuffler = RuleDSL.@ref mr.shuffler(first, 4)
_reducer = RuleDSL.@ref mr.reducer(vcat)

streampred = RuleDSL.@ref mr.mapreduce(test_batches, _streammapper, _shuffler, _reducer, valcat)

# we use the running average to compute the ML score
streamprob = RuleDSL.@ref ml.select(streampred, :proba)
streamprobavg = RuleDSL.@ref ml.runningaverage(streamprob)
streamscore = RuleDSL.@ref ml.score(RuleDSL.@ref(ml.select(streampred, :isFraud)), streamprobavg)

# finally we record all the ML score history
streamrecord = RuleDSL.@ref ml.record(streamscore)
ml:record/ml:score/ml:select/mr:mapreduce/NodeRef[4]
# create a local graph with the default pipeline
gs7 = RuleDSL.createlocalgraph(config, RuleDSL.GenericData(), Set([streamrecord]));

# this single line of code turns the regular batch pipeline into a streaming pipeline,
# by specifying the source and sink of the stream processing
RuleDSL.initstream!(gs7, Set(hash(stream_src)), Set(hash(streamrecord)));
svg = GraphIO.postlocalgraph(gss, gs7, port, true; key="stream");
GraphIO.postsvg(svg, "ml_pipeline_7.svg")
view graph data at http://127.0.0.1:8080/ui/depgraph.html?dataurl=127.0.0.1:7138_stream

Figure 7 - Local Streaming.

# stream all the training batches data through
RuleDSL.pushpullcalc!(gs7, length(stream_batches))

# stop any further streaming, and persist the state
RuleDSL.stopstream!(gs7);

The recorded history of ML score clearly shows the improvements in model inference, and it is quite interesting to see how much the quality of inference improves with more training data.

GraphVM.getdata(gs7, hash(streamrecord))
20-element Vector{Any}:
 0.7050190650053733
 0.9499122299686296
 0.9558437211008541
 0.9390098515642447
 0.9609429683553885
 0.972129209439276
 0.9728823436617101
 0.9703032705402201
 0.97440203123993
 0.9759333815204342
 0.9748465722334383
 0.976859699846071
 0.9763943627087297
 0.9783027601916094
 0.9790762690144201
 0.979416501421632
 0.9794419345434815
 0.9793908341089733
 0.9800635472076212
 0.9802133824871737

Julius streaming is fully pipelined, in that each node in the graph processes a different input data batch simultaneously, which is a lot faster than the mini-batching approach in Spark. Julius streaming also works for distributed graphs across multiple computers, again without any code changes in RuleDSL or Atoms. The developer just needs to make a different API call for distributed streaming.

gs = GraphVM.RemoteGraphProxy(config, my_domain => remoteport, balancer, GraphVM.GenericData())
GraphVM.@rpc GraphVM.workerstatus(gs)
Dict{UInt64, Pair{Float64, GraphEngine.GraphVM.WorkerStatus}} with 4 entries:
  0x18fdae7dd515c15a => 1.65314e9=>Ready
  0x248c936e6c330a93 => 1.65314e9=>Ready
  0x0588c4f52e2b6c09 => 1.65314e9=>Ready
  0x8cf52965bd043568 => 1.65314e9=>Ready
GraphVM.@remote_eval gs begin

    RuleDSL.@datom Record begin
        values::Vector = []

        function fwddata!(x::Any)
            push!(values, x)
        end
    end

    RuleDSL.@datom RunningAverage begin
        sum::DataFrame = DataFrame()
        cnt::Vector = [0]

        function fwddata!(x::DataFrame)
            if cnt[1] == 0
                append!(sum, x)
            else
                sum .+= x
            end

            cnt[1] += 1
            [ sum ./ cnt[1] ]
        end
    end

    RuleDSL.@datom StreamSrc begin
        config::RuleDSL.Config
        batchsrc::Vector{RuleDSL.NodeRef}
        idx::Vector = [0]

        function fwddata!()
            thissrc = batchsrc[idx[1] % length(batchsrc) + 1]
            atom = RuleDSL.calcop(config, thissrc)
            idx[1] += 1
            RuleDSL.fwddata!(atom)
        end
    end
end

GraphVM.waitcheckstatus(gs, RuleDSL.getconfig(config, :project));

GraphVM.@addrules gs ml begin
    streamsrc(refs::Vector{RuleDSL.NodeRef})=StreamSrc[@config, refs]()
    runningaverage(ref::RuleDSL.NodeRef)=RunningAverage(ref...)
    record(ref::RuleDSL.NodeRef)=Record(ref...)
end
GraphVM.waitcheckstatus(gs, RuleDSL.getconfig(config, :project));

streampred = RuleDSL.@ref mr.mapreduce(test_batches, _streammapper, _shuffler, _reducer, Main.valcat)
streamprob = RuleDSL.@ref ml.select(streampred, :proba)
streamprobavg = RuleDSL.@ref ml.runningaverage(streamprob)
streamscore = RuleDSL.@ref ml.score(RuleDSL.@ref(ml.select(streampred, :isFraud)), streamprobavg)
streamrecord = RuleDSL.@ref ml.record(streamscore)

# create a regular batch piepline
GraphVM.createremotegraph(gs, Set([streamrecord]), Set([:bagpred, :splitbykey, :reducer]))

# turns it into streaming mode by specifying data source and sink
GraphVM.initstream!(gs, UInt(0), Set(hash(stream_src)), Set(hash(streamrecord)))

# stream data through
RuleDSL.pushpullcalc!(gs, Set(UInt(0)), length(stream_batches))
GraphVM.waitcheckstatus(gs, RuleDSL.getconfig(config, :project));

# finalize, no longer accept future streaming data
RuleDSL.stopstream!(gs);

svg = GraphIO.postremotegraph(gs, remoteport, true);
GraphIO.postsvg(svg, "ml_pipeline_8.svg")

Figure 6 - Distributed Streaming.

We can retrieve the distributed streaming results:

GraphVM.@rpc GraphVM.onnode(gs, :getdata, UInt[0], hash(streamrecord))
20-element Vector{Any}:
 0.8454155650331014
 0.9484706216061067
 0.9594620861942083
 0.9704635038916891
 0.9737730649843896
 0.9688014985588833
 0.9672632630685964
 0.9753062887913655
 0.9757228439832072
 0.9729776593209442
 0.9674037775539538
 0.965326387983351
 0.9658349801631017
 0.965026014851819
 0.9625683696647587
 0.9649897152764351
 0.9682083634971096
 0.9662825187989645
 0.9640438420182527
 0.9626878303963933
GraphVM.@rpc GraphVM.endcluster(gs)
0

5. Conclusions

This tutorial shows how to productionize an ML model by adding batching and distribution capabilities in Julius step by step. It only takes two additional rules ml.bagpred, ml.batchpred and one additional function valcat to turn a simple ML pipeline into a doubly batched and fully distributed ML pipeline. This is astonishing considering the fact that the doubly batched pipeline is quite complex with 156 nodes, which is a big increase from the 13 nodes in the original ML pipeline.

We explained in section 3 that Julius graph distribution and execution is much more efficient than Dask or Dagger.jl, because Julius only needs to communicate a few key nodes to the workers.

Using Julius, it becomes effortless to move an ML model from development to production, since it requires no code changes and the distribution is fully automatic. In comparison, to productionize an ML model using Dask, developers have to modify the code base heavily and manually using the Dask specific API. Then they have to go through extensive testing and performance tuning as shown in the original Dask blog.

Furthermore, Julius' web UI offers a much easier and more intuitive visualization and navigation of all the data, logic and distribution in the computational graph, with every intermediate result fully visible to the user. In comparison, it is quite difficult to access intermediate data and distribution results from Dask; as the individual tasks in Dask are transient and their states are not persisted. Yes, one can manually record and persist intermediate results from Dask, but that requires additional coding and manual effort.

Julius is also more flexible, as the ML pipeline can run in batch, streaming or distributed modes without additional coding. Dask, on the other hand, only supports batch processing, but not the streaming use cases.


This page was generated using Literate.jl.