Tutorial 4: Distributed Machine Learning
How to use this tutorial
- This tutorial is also available in Jupyter notebook format. To access and run the Jupyter notebook version of the tutorial, please sign up for free developer access by following instructions at https://github.com/juliustechco/juliusgraph.
- Additional resources (video demos & blogs) are available at http://juliustech.co.
- To report bugs or request new features, please raise an issue here. To schedule a live demo, please go to http://juliustech.co. Please check out this FAQ page or email us at info@juliustech.co for other general inquiries.
- This tutorial is copyrighted by Julius Technologies, its use is governed by the terms of use.
In this tutorial, we use the Julius GraphEngine and its Domain Specific Language (RuleDSL) to build a distributed Machine Learning (ML) pipeline that can process a large volume of data in parallel for model training and inference.
Introduction
In the real world, the ML training or inference data can be too large to fit into the memory of a single computer. It is a common challenge that ML engineers often face when productionizing an ML model. There is a popular blog post, Training models when data doesn't fit in memory, describing how to use Dask (a popular python distribution package) to build distributed ML pipelines that can process large training and inference data in batches.
In this tutorial, we are going to replicate the functionality described in the blog post using the Julius GraphEngine instead of using Dask or Dagger.jl (which is a Julia package inspired by Dask). We will show how to achieve better results versus the original Dask blog with considerably fewer lines of code. Here, we will re-use the generic MapReduce pipeline developed in a previous tutorial. At the end of this tutorial, we will compare Julius with Dask, and summarize the key differences.
We use the same fraud detection dataset as the Dask blog, which originated from Kaggle. The realization of fraud is indicated by the column isFraud
, which is the outcome the ML model tries to predict.
1. Simple ML Pipeline
As a baseline solution, we first build a simple ML pipeline in Julius without batching or distribution. We will then describe step by step how to adapt it to support batching and distribution. This is practical as it matches a typical ML model development workflow, where a data scientist first builds an ML model using a simple data pipeline, and then a data engineer parallelizes the implementation to handle large volumes of data. To productionize an ML model, it often takes multiple iterations between data scientists and data engineers which represents one of the most time-consuming and costly steps of a ML model's life cycle. By repeating the exact process in this tutorial, we illustrate how easy it is to move an ML pipeline from development to production using Julius.
In order to match the numbers in the original Dask blog, we chose to use the same Python ML models in sklearn. Julius can interop and integrate with any major programming languages such as Python, C/C++, Java, R, .Net, Julia, etc.
The fraud detection ML pipeline, as described in the Dask blog, consists of the following steps:
- read datasets,
- separate the features from the target variable from the data,
- train a ML model (a
ExtraTreesClassifier
model), - infer using test data,
- compute the AUC score.
We now proceed to the implementation. First, we import the required Julia and Julius packages:
using GraphEngine: RuleDSL, GraphVM
using DataFrames, DataScience, StatsBase
using AtomExt
The following are the few rules needed to define the entire ML pipeline, which are self-explanatory and roughly follow the steps mentioned above.
RuleDSL.@addrules ml begin
# selects columns `cols` from a DataFrame
select(
ref::RuleDSL.NodeRef, cols::Any;
label="$(isa(cols, InvertedIndex) ? "col != $(cols.skip)" : "col == $(cols)")"
) = begin
DataScience.ApplyFn[x::DataFrame -> DataFrames.select(x, cols; copycols=false)](ref...)
end
# any `sklearn` ML model can be easily handled by overloading the `classifiertrain`
# rule, as follows
classifiertrain(
model::Val{:ExtraTreesClassifier},
options::Dict,
trainxs::RuleDSL.NodeRef,
trainy::RuleDSL.NodeRef;
label="$model train"
) = begin
DataScience.PyTrain["sklearn.ensemble.ExtraTreesClassifier", options](trainxs..., trainy...)
end
# this rule makes the predictions
classify(
train_data::RuleDSL.NodeRef, target::Symbol, model::Val, options::Dict, testx::RuleDSL.NodeRef;
label="$model inference"
) = begin
train_data_X = RuleDSL.@ref ml.select(train_data, Not(target))
train_data_y = RuleDSL.@ref ml.select(train_data, target)
trained = RuleDSL.@ref ml.classifiertrain(model, options, train_data_X, train_data_y )
DataScience.PyPredict(trained..., testx...)
end
# makes predictions and selects the `:proba` column for the resulting DataFrame
classifyprob(
train_data::RuleDSL.NodeRef,
target::Symbol,
model::Val,
options::Dict,
test_data::RuleDSL.NodeRef;
label="prob"
) = begin
testx = RuleDSL.@ref ml.select(test_data, Not(target))
DataScience.ApplyFn[
x::DataFrame -> DataFrames.select(x, :proba; copycols=false)
](classify(train_data, target, model, options, testx))
end
# computes the AUC score
score(realized::RuleDSL.NodeRef, probs::RuleDSL.NodeRef) = begin
DataScience.PyScore(realized..., probs...)
end
end
The PyTrain
, PyPredict
, PyScore
are three useful atoms provided by the DataScience
package that wraps up the training, inference and roc_auc_score
method from the sklearn Python package. The PyTrain
atom is generic, it can instantiate any Python ML model using its name and parameters as shown at the ml.classifiertrain
rule. Using this set of rules, we can create a simple ML pipeline as:
# we use existing rules in ds namespace to read CSV files from a shared drive
train_data_file = joinpath(@__DIR__, "..", "data/train_fraud.csv")
test_data_file = joinpath(@__DIR__, "..", "data/test_fraud.csv")
train_data = RuleDSL.@ref ds.csvsrc(train_data_file, true; label="train data")
test_data = RuleDSL.@ref ds.csvsrc(test_data_file, true; label="test data")
target = :isFraud
model = Val(:ExtraTreesClassifier)
options = Dict(:n_estimators => 10, :min_samples_leaf => 10)
pred = RuleDSL.@ref ml.classifyprob(train_data, target, model, options, test_data)
test_data_y = RuleDSL.@ref ml.select(test_data, target)
mlscore = RuleDSL.@ref ml.score(test_data_y, pred);
config = RuleDSL.Config()
gs1 = GraphVM.createlocalgraph(config, RuleDSL.GenericData())
GraphVM.calcfwd!(gs1, Set([mlscore]));
We have now created a simple ML training and inference pipeline without using any batching or distribution. Julius provides an easy-to-use web UI for users to navigate and visualize the resulting data and logic in the computation graph. The following code block starts a local server for the web UI so that we can retrieve the resulting data from the graph.
using GraphIO
# a container of graphs
gss = Dict{String,RuleDSL.AbstractGraphState}()
# used for WebUI display purposes
port = GraphVM.drawdataport()
@async GraphVM.startresponder(gss, port);
The Julius package GraphIO
provides several convenience functions for retrieving and displaying graphs in SVG format. Users can also view the graph data interactively by clicking on the url below to bring up the full web UI.
svg = GraphIO.postlocalgraph(gss, gs1, port, true; key="single");
GraphIO.postsvg(svg, "ml_pipeline_1.svg")
view graph data at http://127.0.0.1:8080/ui/depgraph.html?dataurl=127.0.0.1:7138_single
Figure 1 - Simple ML Pipeline.
Then, the AUC score value obtained using the complete dataset is:
RuleDSL.getdata(gs1, mlscore, 1)
0.9878845834626067
1.1 Down Sampling
Downsampling is a common technique used to reduce the training data size so that the training can run faster. The Dask blog implemented a 5% downsampling while maintaining a constant fraction of real fraud. We replicate the same downsampling scheme using a single Julia function and a single rule in Julius:
using Random
using StatsBase
function downsample(ycol::Symbol, frac::Float64, df::DataFrame)
# get filtered DataFrames with true/false cases for isFraud
positive = DataFrames.filter(row -> isequal(row[ycol], true), df)
negative = DataFrames.filter(row -> isequal(row[ycol], false), df)
# sample with replacement each DataFrame
dspositive = positive[sample(1:nrow(positive), round(Int, frac * nrow(positive)), replace=true), :]
dsnegative = negative[sample(1:nrow(negative), round(Int, frac * nrow(negative)), replace=true), :]
# concatenate both sampled DataFrames
merged = vcat(dspositive, dsnegative)
# shuffle rows before returning
return merged[shuffle(1:nrow(merged)), :]
end
@addrules ml begin
downsample(
raw::RuleDSL.NodeRef, ycol::Symbol, frac::Float64
) = begin
DataScience.ApplyFn[downsample, ycol, frac](raw...)
end
end
Let's test the downsampling:
sampleratio = 0.05
downsamples = RuleDSL.@ref ml.downsample(train_data, target, sampleratio)
ml:downsample/ds:csvsrc/train data
gs2 = GraphVM.createlocalgraph(config, RuleDSL.GenericData())
GraphVM.calcfwd!(gs2, Set([downsamples]))
svg = GraphIO.postlocalgraph(gss, gs2, port; key="downsample");
GraphIO.postsvg(svg, "ml_pipeline_2.svg")
view graph data at http://127.0.0.1:8080/ui/depgraph.html?dataurl=127.0.0.1:7138_downsample
Figure 2 - Down Sample
We can verify that the fraud frequency remains unchanged, with the minor remaining difference due to rounding.
sample_df = RuleDSL.getdata(gs2, downsamples, 1)
sum(sample_df.isFraud) / size(sample_df, 1) * 100
0.12966091705630425
# use full data set to verify
using CSV
df = CSV.read(train_data_file, DataFrames.DataFrame)
sum(df.isFraud) / size(df, 1) * 100
0.12828849784581414
It is easy to modify the existing ML pipeline to include downsampling, we just replace the train_data
with downsamples
:
downproba = RuleDSL.@ref ml.classifyprob(downsamples, target, model, options, test_data)
downscore = RuleDSL.@ref ml.score(test_data_y, downproba)
gs3 = GraphVM.createlocalgraph(config, RuleDSL.GenericData())
GraphVM.calcfwd!(gs3, Set([downscore]))
svg = GraphIO.postlocalgraph(gss, gs3, port, true; key="downscore");
GraphIO.postsvg(svg, "ml_pipeline_3.svg")
view graph data at http://127.0.0.1:8080/ui/depgraph.html?dataurl=127.0.0.1:7138_downscore
Figure 3 - ML with Down Sample
Here, the AUC score obtained using downsampling is slightly less than that from using the full training data, as expected.
RuleDSL.getdata(gs3, downscore, 1)
0.9715159105488534
We have now built the baseline ML pipeline where the entire training and inference data is processed all at once. An obvious downside of this implementation is that it can't handle large training or inference data, if they do not fit into the computer's memory. Now let's proceed to productionize the pipeline by adding batching and distribution.
2. ML Pipeline with Batching
It is a common strategy to break the training data into multiple batches and train a separate ML model for each batch. Once we have multiple trained ML models, we can average their inferences for better accuracy. This strategy of boosting accuracy from multiple trained models is commonly called "bagging". Batching and bagging are often used together to allow large training data to be split across multiple machines to be processed in parallel.
2.1 Training Data Batching
We use a convenience type DDataFrame
provided by Julius DataScience
package to create a vector of RuleDSL.NodeRef
that represents roughly equal-sized chunks from the large input CSV file.
train_ddf = DataScience.DDataFrame(train_data_file, blocksize="5 MB")
train_batches = train_ddf.chunks
down_batches = RuleDSL.@ref(ml.downsample(b, target, sampleratio) for b in train_batches)
8-element Vector{NodeRef}:
ml:downsample/dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/train_fraud.csv
ml:downsample/dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/train_fraud.csv
ml:downsample/dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/train_fraud.csv
ml:downsample/dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/train_fraud.csv
ml:downsample/dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/train_fraud.csv
ml:downsample/dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/train_fraud.csv
ml:downsample/dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/train_fraud.csv
ml:downsample/dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/train_fraud.csv
Training data batching and bagging can be easily implemented using the following single rule, which just re-uses the previous ml.classifyprob
for each input batch and averages their output.
# compute the average of multiple dataframes
function dfmean(dfs::DataFrame...)
df = reduce(.+, dfs)
df ./ (length(dfs))
end
RuleDSL.@addrules ml begin
bagpred(
test::RuleDSL.NodeRef,
model::Val,
options::Dict,
train_batches::Vector{RuleDSL.NodeRef},
target::Symbol
) = begin
refs = RuleDSL.@ref((ml.classifyprob(b, target, model, options, test) for b in train_batches))
DataScience.ApplyFn[dfmean](refs...)
end
end
bagpred = RuleDSL.@ref ml.bagpred(test_data, model, options, down_batches, target)
bagscore = RuleDSL.@ref ml.score(test_data_y, bagpred)
gs4 = GraphVM.createlocalgraph(config, RuleDSL.GenericData())
GraphVM.calcfwd!(gs4, Set([bagscore]))
svg = GraphIO.postlocalgraph(gss, gs4, port; key="ml");
GraphIO.postsvg(svg, "ml_pipeline_4.svg")
view graph data at http://127.0.0.1:8080/ui/depgraph.html?dataurl=127.0.0.1:7138_ml
Figure 4 - Batching & Bagging in Training Data
The AUC score obtained using multiple samples of the data is:
RuleDSL.getdata(gs4, bagscore, 1)
0.95354937716486
2.2 Batching both Training and Prediction Data
In the previous implementation, the training data is batched but not the inference data. In practice, the inference data could also be too large to fit into the memory of a single machine. In that case, we will also need to batch the inference data. In practice, it is a much more complex process to batch both training and inference data. But in Julius, we can leverage the generic MapReduce pattern to easily define this complicated pipeline with very little coding.
We first use DataScience.DDataFrame
to create a vector of NodeRef.RuleDSL
for the inference data.
test_ddf = DataScience.DDataFrame(test_data_file, blocksize="2.5 MB")
test_batches = test_ddf.chunks
4-element Vector{NodeRef}:
dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/test_fraud.csv
dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/test_fraud.csv
dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/test_fraud.csv
dd:read_csv_blocks//home/runner/work/Tutorials/Tutorials/docs/../data/test_fraud.csv
The doubly batched training and inference ML pipeline naturally maps to a MapReduce pipeline as the following:
- mapper: compute the bagged inference of a single test batch from multiple trained models. This stage already includes training data batching.
- shuffler/reducer: move the individual batch inference and concatenate them to form the entire inference.
The following batchpred
rule extracts the realization and inference from the same test batch file in a DataFrame, and assigns a unique key using the hash of the batch file's NodeRef
.
RuleDSL.@addrules ml begin
# extract both realization and prob predictions
batchpred(
test::RuleDSL.NodeRef,
model::Val,
options::Dict,
train_batches::Vector{RuleDSL.NodeRef},
target::Symbol
) = begin
DataScience.ApplyFn[
(ind, prob)->[hash(test) => hcat(ind, prob)]
](select(test, target), bagpred(test, model, options, train_batches, target))
end
end
# extracts the DataFrames from `batchpred` from all batches and concatenates them
function valcat(xs::Vector...)
agg = DataFrame()
for (_, v) in vcat(xs...)
agg = vcat(agg, v)
end
return agg
end
valcat (generic function with 1 method)
The following is the entire definition of the doubly batched ML pipeline using the MapReduce pattern:
_mapper = RuleDSL.@ref ml.batchpred(model, options, down_batches, target)
# map all batches to 3 pipelines before reducing
_shuffler = RuleDSL.@ref mr.shuffler(first, 4)
# simply concatenates all the vectors in a given pipeline
_reducer = RuleDSL.@ref mr.reducer(vcat)
# valcat extracts the DataFrame from each batch, and concatenate them together
mrpred = RuleDSL.@ref mr.mapreduce(test_batches, _mapper, _shuffler, _reducer, valcat)
mr:mapreduce/NodeRef[4]
The last step in MapReduce is to aggregate the results from the mapper/shuffler results of the individual test batches. The generic mr.mapreduce
rule can take an optional function as the last parameter to customize this aggregation. The function valcat
is used for the aggregation, which concatenates individual batches' fraud realization and inference into a single DataFrame.
mrscore = RuleDSL.@ref ml.score(RuleDSL.@ref(ml.select(mrpred, :isFraud)), RuleDSL.@ref(ml.select(mrpred, :proba)))
gs5 = GraphVM.createlocalgraph(config, RuleDSL.GenericData())
GraphVM.calcfwd!(gs5, Set([mrscore]))
svg = GraphIO.postlocalgraph(gss, gs5, port; key="mapred");
GraphIO.postsvg(svg, "ml_pipeline_5.svg")
view graph data at http://127.0.0.1:8080/ui/depgraph.html?dataurl=127.0.0.1:7138_mapred
Figure 5 - Doubly Batching in Training and Inference
The corresponding AUC score from the doubly batched pipeline is similar:
RuleDSL.getdata(gs5, mrscore, 1)
0.9906256229475517
3. Distributed ML Pipeline
We have now adapted the simple ML pipeline to support doubly data batching in both the training and inference. However, the real benefits of batching come from distributing the data and computation to multiple computers, so that we can process large volumes of data and computation in parallel.
Using Julius, it is effortless to distribute the batched pipeline to multiple computers and run it in parallel. Let's use the doubly batched ML pipeline as an example to show how easy it is to distribute. We first connect to a remote cluster with 4 worker instances, and import the necessary packages on the remote cluster:
using GraphEngine: RuleDSL, GraphVM
config = RuleDSL.newconfig(RuleDSL.Config(), :project => "MapReduce")
balancer = GraphVM.GlobalUnique()
my_domain = GraphVM.mydomain()
# draw a port number to start the local cluster esrvice
remoteport = GraphVM.drawdataport()
7455
# start a local master service at the given port, which mimic the effects of a remote cluster
gs0 = GraphVM.RemoteGraphProxy(my_domain => 7225)
GraphVM.@rpc GraphVM.startlocalmasterservice(gs0, remoteport, 4)
gs = GraphVM.RemoteGraphProxy(config, my_domain => remoteport, balancer, GraphVM.GenericData())
GraphVM.wait4clusterinit(gs)
Dict{UInt64, Pair{Float64, GraphEngine.GraphVM.WorkerStatus}} with 4 entries:
0x18fdae7dd515c15a => 1.65314e9=>Ready
0x248c936e6c330a93 => 1.65314e9=>Ready
0x0588c4f52e2b6c09 => 1.65314e9=>Ready
0x8cf52965bd043568 => 1.65314e9=>Ready
GraphVM.@remote_eval gs begin
using GraphEngine: RuleDSL, GraphVM
using DataScience, Random, AtomExt, GraphIO
using StatsBase, DataFrames
end
GraphVM.waitcheckstatus(gs, RuleDSL.getconfig(config, :project));
We now load the entire doubly batched ML pipeline to the remote cluster, by sending the code we have written so far to the remote cluster. The following is the full list of code to replicate the Dask blog in Julius. There are only 8 rules, and about 50 lines of code in total.
GraphVM.@remote_eval gs begin
function downsample(ycol::Symbol, frac::Float64, df::DataFrame)
positive = DataFrames.filter(row -> isequal(row[ycol], true), df)
negative = DataFrames.filter(row -> isequal(row[ycol], false), df)
dspositive = positive[sample(1:nrow(positive), round(Int, frac * nrow(positive)), replace=true), :]
dsnegative = negative[sample(1:nrow(negative), round(Int, frac * nrow(negative)), replace=true), :]
merged = vcat(dspositive, dsnegative)
return merged[shuffle(1:nrow(merged)), :]
end
function dfmean(dfs::DataFrame...)
df = reduce(.+, dfs)
return df ./ (length(dfs))
end
function valcat(xs::Vector...)
agg = DataFrame()
for (_, v) in vcat(xs...)
agg = vcat(agg, v)
end
return agg
end
end
GraphVM.waitcheckstatus(gs, RuleDSL.getconfig(config, :project));
GraphVM.@addrules gs ml begin
select(ref::RuleDSL.NodeRef, cols::Any; label="$(isa(cols, InvertedIndex) ? "col != $(cols.skip)" : "col == $(cols)")") = DataScience.ApplyFn[x::DataFrame->DataFrames.select(x, cols; copycols=false)](ref...)
classifiertrain(model::Val{:ExtraTreesClassifier}, options::Dict, trainxs::RuleDSL.NodeRef, trainy::RuleDSL.NodeRef; label="$model train") = DataScience.PyTrain["sklearn.ensemble.ExtraTreesClassifier", options](trainxs..., trainy...)
classify(train_data::RuleDSL.NodeRef, target::Symbol, model::Val, options::Dict, testx::RuleDSL.NodeRef; label="$model inference") = begin
train_data_X = RuleDSL.@ref ml.select(train_data, Not(target))
train_data_y = RuleDSL.@ref ml.select(train_data, target)
trained = RuleDSL.@ref ml.classifiertrain(model, options, train_data_X, train_data_y )
DataScience.PyPredict(trained..., testx...)
end
classifyprob(train_data::RuleDSL.NodeRef, target::Symbol, model::Val, options::Dict, test_data::RuleDSL.NodeRef; label="prob") = begin
testx = RuleDSL.@ref ml.select(test_data, Not(target))
DataScience.ApplyFn[x::DataFrame->DataFrames.select(x, :proba; copycols=false)](classify(train_data, target, model, options, testx))
end
score(realized::RuleDSL.NodeRef, probs::RuleDSL.NodeRef)=DataScience.PyScore(realized..., probs...)
downsample(raw::RuleDSL.NodeRef, ycol::Symbol, frac::Float64)=DataScience.ApplyFn[Main.downsample, ycol, frac](raw...)
bagpred(test::RuleDSL.NodeRef, model::Val, options::Dict, train_batches::Vector{RuleDSL.NodeRef}, target::Symbol) = DataScience.ApplyFn[Main.dfmean](RuleDSL.@ref((ml.classifyprob(b, target, model, options, test) for b = train_batches))...)
batchpred(test::RuleDSL.NodeRef, model::Val, options::Dict, train_batches::Vector{RuleDSL.NodeRef}, target::Symbol) = DataScience.ApplyFn[(ind, prob)->[hash(test) => hcat(ind, prob)]](select(test, target), bagpred(test, model, options, train_batches, target))
end
GraphVM.waitcheckstatus(gs, RuleDSL.getconfig(config, :project));
Afterwards, we can create and run the doubly batched pipeline on the cluster.
mrpred = RuleDSL.@ref mr.mapreduce(test_batches, _mapper, _shuffler, _reducer, Main.valcat)
mrscore = RuleDSL.@ref ml.score(RuleDSL.@ref(ml.select(mrpred, :isFraud)), RuleDSL.@ref(ml.select(mrpred, :proba)))
# select all the nodes with rule name classifiertrain, splitbykey and reducer in alljobs
keyjobs, ds = RuleDSL.jobdeps(config, [mrscore], [:classifiertrain, :splitbykey, :reducer]);
GraphVM.initgraph!(gs)
# distribute the nodes in alljobs to workers
GraphVM.dispatchjobs!(gs, keyjobs; nocopy=Set([:splitbykey]))
GraphVM.waitcheckstatus(gs, RuleDSL.getconfig(config, :project));
In the code block above, we first find the keyjobs
, which is a vector of NodeRef
that holds all the nodes in the graph that are associated with rule name classifertrain, splitbykey and reducer. These nodes are the main points of computation and data aggregation. A developer can use his domain knowledge about the specific workload in selecting the best set of rule names for the key jobs. The last key job is always the final node that the user wants to compute, which is mrscore
in the cell above. Once these key jobs (or key nodes) are determined, we send them to the workers by calling GraphVM.dispatchjobs!
. The optional nocopy
parameter specifies rule names whose nodes shall not be copied over the network, it is meant for rules/nodes holding a large amount of data thus expensive for copying. If a node is designated as nocopy, the Julius Rule Engine automatically places its immediate dependencies on the same worker to avoid data copy. In the example above, the nodes splitbykey
was flagged as nocopy
to ensure the mapping
results from a given input batch only exists on one worker, in order to conserve memory usage.
The way Julius distributes the jobs to workers is fundamentally different from Dask or Dagger.jl. In Dask or Dagger.jl, every node in their graphs represents a separate task that must be sent to a worker for execution individually. In Julius, since every worker holds the entire graph logic as defined by the RuleDSL
, the dispatcher does not need to send every node to the worker, instead only a few key nodes are sent as a list of NodeRef
objects. Once a worker receives a job definition in NodeRef
, it can create any dependent nodes from the shared RuleDSL
definitions. As a result, the Julius distribution requires much less communication between the dispatcher and the workers, thus incurring much less overhead.
The following cell shows that only 17 nodes need to be sent to the workers amongst all 156 nodes in the graph. The GraphVM.@rpc
is a convenient macro that facilitates a remote procedure call via a GraphVM.RemoteGraphProxy
object.
gstats = GraphVM.@rpc GraphIO.graphstats(gs)
println("length of keyjobs = ", length(keyjobs))
println("graph node cnt = ", gstats[:cnt])
length of keyjobs = 17
graph node cnt = 156
By creating dependent nodes at workers, Julius also achieves much better locality for graph execution as all the dependent nodes created by the worker are local. Therefore the Julius graph distribution and execution are much more efficient than Dask or Dagger.jl. Julius can easily distribute graphs as large as hundreds of millions of nodes, and even in these cases, the number of key nodes to distribute is rarely more than a few thousand. In comparison, Dask or Dagger.jl suffer from significant overhead when dealing with large graphs because of the need to distribute every single node. As a result, developers are advised to "avoid large graphs" when using Dask, while Julius does not suffer from such limitations.
svg = GraphIO.postremotegraph(gs, remoteport; maxnode=UInt(200));
GraphIO.postsvg(svg, "ml_pipeline_6.svg")
Figure 6 - Distributed ML Pipeline.
Data from the remote cluster is easily accessible using Julius' remote RPC interface.
GraphVM.@rpc GraphVM.onnode(gs, :getdata, UInt[0], hash(mrscore), 1)
0.9793635977178947
The resulting graph uses different colors to indicate the placement of nodes to individual worker instances, where a single color represents a given physical worker instance. There is a network data transfer for every arrow connecting two nodes of different colors. The entire graph distribution is handled by Julius automatically without the need for the developer to change a single line of code in RuleDSL or Atom definitions.
Upon close examination, we observe that the resulting MapReduce distribution is optimal in that the work load is evenly distributed amongst 4 workers, and there is no unnecessary data transfer between different physical computers (represented by different colors) in the resulting graph distribution.
Neither training nor inference input data were ever aggregated onto a single computer, so that we can stay within an individual worker's memory limit. Only the realization and inference outputs were aggregated to a single machine, which are only two columns of data; a tiny fraction compared to the entire inference input data set.
4. Streaming
Besides batching, streaming is another effective strategy to minimize memory usage. Instead of processing the entire data set at once, we could break it into multiple batches and process them sequentially in time. Streaming was not implemented in the original Dask blog, as the DAG created by Dask does not support streaming.
In contrast, any pipeline created in Julius can be easily run in streaming mode with a few lines of code. In the streaming mode, the RuleDSL.fwddata!
method will be called multiple times with different streaming inputs. Here, we implement a few generic Atom
types that act as the source, running average and cumulator, so that we can express the rich logic and behaviors in stream processing.
# records the streaming value x
RuleDSL.@datom Record begin
values::Vector = []
function fwddata!(x::Any)
push!(values, x)
end
end
# computes the running average of all the value being streamed
RuleDSL.@datom RunningAverage begin
sum::DataFrame = DataFrame()
cnt::Vector = [0]
function fwddata!(x::DataFrame)
if cnt[1] == 0
append!(sum, x)
else
sum .+= x
end
cnt[1] += 1
[ sum ./ cnt[1] ]
end
end
# sequentially return values as represented in batchsrc, for each fwddata! call
# this only work for a source node, i.e., NodeRef without any further dependencies
RuleDSL.@datom StreamSrc begin
config::RuleDSL.Config
batchsrc::Vector{RuleDSL.NodeRef}
idx::Vector = [0]
function fwddata!()
thissrc = batchsrc[idx[1] % length(batchsrc) + 1]
atom = RuleDSL.calcop(config, thissrc)
idx[1] += 1
RuleDSL.fwddata!(atom)
end
end
fwddata! (generic function with 93 methods)
We then add a few generic high level rules to connect these Atoms for streaming source, running average and cumulator:
RuleDSL.@addrules ml begin
streamsrc(refs::Vector{RuleDSL.NodeRef}) = StreamSrc[RuleDSL.@config, refs]()
runningaverage(ref::RuleDSL.NodeRef) = RunningAverage(ref...)
record(ref::RuleDSL.NodeRef) = Record(ref...)
end
You might be curious how the ML performance improves as more training data is added in the bagging process. The following is a streaming pipeline that can answer this kind of question. The streaming ML pipeline receives individual training data: for each of which a new ML model is trained, then its inference is used to compute a running average of model inferences. The ML scores computed from the running average inference are then recorded, which shows how AUC score improves with more training data.
stream_ddf = DataScience.DDataFrame(train_data_file, blocksize="2 MB")
stream_batches = stream_ddf.chunks
stream_src = RuleDSL.@ref ml.streamsrc(stream_batches)
down_stream = RuleDSL.@ref ml.downsample(stream_src, target, sampleratio)
# change the mapper to use the streaming data source for training
_streammapper = RuleDSL.@ref ml.batchpred(model, options, [down_stream], target)
# the shuffler/reducer remains the same as before
_shuffler = RuleDSL.@ref mr.shuffler(first, 4)
_reducer = RuleDSL.@ref mr.reducer(vcat)
streampred = RuleDSL.@ref mr.mapreduce(test_batches, _streammapper, _shuffler, _reducer, valcat)
# we use the running average to compute the ML score
streamprob = RuleDSL.@ref ml.select(streampred, :proba)
streamprobavg = RuleDSL.@ref ml.runningaverage(streamprob)
streamscore = RuleDSL.@ref ml.score(RuleDSL.@ref(ml.select(streampred, :isFraud)), streamprobavg)
# finally we record all the ML score history
streamrecord = RuleDSL.@ref ml.record(streamscore)
ml:record/ml:score/ml:select/mr:mapreduce/NodeRef[4]
# create a local graph with the default pipeline
gs7 = RuleDSL.createlocalgraph(config, RuleDSL.GenericData(), Set([streamrecord]));
# this single line of code turns the regular batch pipeline into a streaming pipeline,
# by specifying the source and sink of the stream processing
RuleDSL.initstream!(gs7, Set(hash(stream_src)), Set(hash(streamrecord)));
svg = GraphIO.postlocalgraph(gss, gs7, port, true; key="stream");
GraphIO.postsvg(svg, "ml_pipeline_7.svg")
view graph data at http://127.0.0.1:8080/ui/depgraph.html?dataurl=127.0.0.1:7138_stream
Figure 7 - Local Streaming.
# stream all the training batches data through
RuleDSL.pushpullcalc!(gs7, length(stream_batches))
# stop any further streaming, and persist the state
RuleDSL.stopstream!(gs7);
The recorded history of ML score clearly shows the improvements in model inference, and it is quite interesting to see how much the quality of inference improves with more training data.
GraphVM.getdata(gs7, hash(streamrecord))
20-element Vector{Any}:
0.7050190650053733
0.9499122299686296
0.9558437211008541
0.9390098515642447
0.9609429683553885
0.972129209439276
0.9728823436617101
0.9703032705402201
0.97440203123993
0.9759333815204342
0.9748465722334383
0.976859699846071
0.9763943627087297
0.9783027601916094
0.9790762690144201
0.979416501421632
0.9794419345434815
0.9793908341089733
0.9800635472076212
0.9802133824871737
Julius streaming is fully pipelined, in that each node in the graph processes a different input data batch simultaneously, which is a lot faster than the mini-batching approach in Spark. Julius streaming also works for distributed graphs across multiple computers, again without any code changes in RuleDSL or Atoms. The developer just needs to make a different API call for distributed streaming.
gs = GraphVM.RemoteGraphProxy(config, my_domain => remoteport, balancer, GraphVM.GenericData())
GraphVM.@rpc GraphVM.workerstatus(gs)
Dict{UInt64, Pair{Float64, GraphEngine.GraphVM.WorkerStatus}} with 4 entries:
0x18fdae7dd515c15a => 1.65314e9=>Ready
0x248c936e6c330a93 => 1.65314e9=>Ready
0x0588c4f52e2b6c09 => 1.65314e9=>Ready
0x8cf52965bd043568 => 1.65314e9=>Ready
GraphVM.@remote_eval gs begin
RuleDSL.@datom Record begin
values::Vector = []
function fwddata!(x::Any)
push!(values, x)
end
end
RuleDSL.@datom RunningAverage begin
sum::DataFrame = DataFrame()
cnt::Vector = [0]
function fwddata!(x::DataFrame)
if cnt[1] == 0
append!(sum, x)
else
sum .+= x
end
cnt[1] += 1
[ sum ./ cnt[1] ]
end
end
RuleDSL.@datom StreamSrc begin
config::RuleDSL.Config
batchsrc::Vector{RuleDSL.NodeRef}
idx::Vector = [0]
function fwddata!()
thissrc = batchsrc[idx[1] % length(batchsrc) + 1]
atom = RuleDSL.calcop(config, thissrc)
idx[1] += 1
RuleDSL.fwddata!(atom)
end
end
end
GraphVM.waitcheckstatus(gs, RuleDSL.getconfig(config, :project));
GraphVM.@addrules gs ml begin
streamsrc(refs::Vector{RuleDSL.NodeRef})=StreamSrc[@config, refs]()
runningaverage(ref::RuleDSL.NodeRef)=RunningAverage(ref...)
record(ref::RuleDSL.NodeRef)=Record(ref...)
end
GraphVM.waitcheckstatus(gs, RuleDSL.getconfig(config, :project));
streampred = RuleDSL.@ref mr.mapreduce(test_batches, _streammapper, _shuffler, _reducer, Main.valcat)
streamprob = RuleDSL.@ref ml.select(streampred, :proba)
streamprobavg = RuleDSL.@ref ml.runningaverage(streamprob)
streamscore = RuleDSL.@ref ml.score(RuleDSL.@ref(ml.select(streampred, :isFraud)), streamprobavg)
streamrecord = RuleDSL.@ref ml.record(streamscore)
# create a regular batch piepline
GraphVM.createremotegraph(gs, Set([streamrecord]), Set([:bagpred, :splitbykey, :reducer]))
# turns it into streaming mode by specifying data source and sink
GraphVM.initstream!(gs, UInt(0), Set(hash(stream_src)), Set(hash(streamrecord)))
# stream data through
RuleDSL.pushpullcalc!(gs, Set(UInt(0)), length(stream_batches))
GraphVM.waitcheckstatus(gs, RuleDSL.getconfig(config, :project));
# finalize, no longer accept future streaming data
RuleDSL.stopstream!(gs);
svg = GraphIO.postremotegraph(gs, remoteport, true);
GraphIO.postsvg(svg, "ml_pipeline_8.svg")
Figure 6 - Distributed Streaming.
We can retrieve the distributed streaming results:
GraphVM.@rpc GraphVM.onnode(gs, :getdata, UInt[0], hash(streamrecord))
20-element Vector{Any}:
0.8454155650331014
0.9484706216061067
0.9594620861942083
0.9704635038916891
0.9737730649843896
0.9688014985588833
0.9672632630685964
0.9753062887913655
0.9757228439832072
0.9729776593209442
0.9674037775539538
0.965326387983351
0.9658349801631017
0.965026014851819
0.9625683696647587
0.9649897152764351
0.9682083634971096
0.9662825187989645
0.9640438420182527
0.9626878303963933
GraphVM.@rpc GraphVM.endcluster(gs)
0
5. Conclusions
This tutorial shows how to productionize an ML model by adding batching and distribution capabilities in Julius step by step. It only takes two additional rules ml.bagpred
, ml.batchpred
and one additional function valcat
to turn a simple ML pipeline into a doubly batched and fully distributed ML pipeline. This is astonishing considering the fact that the doubly batched pipeline is quite complex with 156 nodes, which is a big increase from the 13 nodes in the original ML pipeline.
We explained in section 3 that Julius graph distribution and execution is much more efficient than Dask or Dagger.jl, because Julius only needs to communicate a few key nodes to the workers.
Using Julius, it becomes effortless to move an ML model from development to production, since it requires no code changes and the distribution is fully automatic. In comparison, to productionize an ML model using Dask, developers have to modify the code base heavily and manually using the Dask specific API. Then they have to go through extensive testing and performance tuning as shown in the original Dask blog.
Furthermore, Julius' web UI offers a much easier and more intuitive visualization and navigation of all the data, logic and distribution in the computational graph, with every intermediate result fully visible to the user. In comparison, it is quite difficult to access intermediate data and distribution results from Dask; as the individual tasks in Dask are transient and their states are not persisted. Yes, one can manually record and persist intermediate results from Dask, but that requires additional coding and manual effort.
Julius is also more flexible, as the ML pipeline can run in batch, streaming or distributed modes without additional coding. Dask, on the other hand, only supports batch processing, but not the streaming use cases.
This page was generated using Literate.jl.