Tutorial 6: ML Experiment Tracking and Persisting

How to use this tutorial

  • This tutorial is also available in Jupyter notebook format. To access and run the Jupyter notebook version of the tutorial, please sign up for free developer access by following instructions at https://github.com/juliustechco/juliusgraph.
  • Additional resources (video demos & blogs) are available at http://juliustech.co.
  • To report bugs or request new features, please raise an issue here. To schedule a live demo, please go to http://juliustech.co. Please check out this FAQ page or email us at info@juliustech.co for other general inquiries.
  • This tutorial is copyrighted by Julius Technologies, its use is governed by the terms of use.

Introduction

ML model experiment tracking is a common challenge for data scientists and engineers. We hear the following story too often:

”My team spent months training a massive ML model and we got excellent results at some point. But unfortunately we can't reproduce it any more because a number of things have changed, the data, the underlying python library versions and hyperparameters, etc. We are just not sure which combination have worked ...".

This highlights the serious issues around ML experiment tracking. In order to reproduce a past ML run exactly, three things have to be persisted and recovered,

  1. the runtime environment, including hardware, OS, software libraries etc
  2. the input data
  3. the entire code, parameters and configurations, to be able to re-build the entire data/analytical pipeline

Experiment tracking becomes much more challenging if the model runs on a distributed environment. To guarantee reproducibility, each ML experiment should run from a fresh environment, otherwise the data, setting or environment could change between runs. For example, some data files could be added or modified as part of the runs. However, a complete refresh of a complex distributed data and analytical pipeline is often out of the question, as it consists of many software components, parameters and configurations. This is why most existing experiment tracking solutions only persist the parts of the pipeline that are most relevant for the ML models. The downside of this approach is that the stored ML runs can often fail to recover the exact results.

Leveraging its distributed graph computing engine, Julius offers an experiment tracking solution that can persist and recover an entire distributed data and analytical pipeline, as well as the full runtime data and environment. Julius is the only solution on the market with these capabilities.

Julius persists the model experiment with its entire data & analytical pipeline using the following simple steps:

  1. spin up a fresh virtual distributed environment, which only takes a few seconds
  2. run the ML experiment and then record the entire session on the Julius server side, and persist the recorded session onto long term storage. The recorded session contains the step by step instructions to recreate the entire runtime environment, including the entire data and distributed pipeline to recover the exact state of the experiment.
  3. the recorded ML experiment can be easily recovered by replaying it in another fresh environment.

In this notebook, we follow a typical workflow of a data scientist to show Julius' experiment tracking capabilities. We use an ML fraud detection model from a previous tutorial as an example. Readers are referred to the "Distributed Machine Learning pipeline" tutorial for more details on the model itself.

1. Model Development & Experiment

Data scientists usually develop ML models by running experiments interactively in a Jupyter notebook. This section shows the definition and pipeline of a distributed ML model.

using GraphEngine: RuleDSL, GraphVM
using AtomExt
using DataFrames, DataScience, StatsBase, Random

newfunctions = quote
    function downsample(ycol::Symbol, frac::Float64, df::DataFrame)
        positive = DataFrames.filter(row -> isequal(row[ycol], true), df)
        negative =  DataFrames.filter(row -> isequal(row[ycol], false), df)
        dspositive = positive[sample(1:nrow(positive), round(Int, frac * nrow(positive)), replace=true), :]
        dsnegative = negative[sample(1:nrow(negative), round(Int, frac * nrow(negative)), replace=true), :]
        merged = vcat(dspositive, dsnegative)
        merged[shuffle(1:nrow(merged)), :]
    end

    function valcat(xs::Vector...)
        agg = DataFrame()
        for (k, v) in vcat(xs...)
            agg = vcat(agg, v)
        end
        agg
    end

    function dfmean(dfs::DataFrame...)
        df = reduce(.+, dfs)
        df ./ (length(dfs))
    end
end

newrules = quote
    select(ref::RuleDSL.NodeRef, cols::Any; label="$(isa(cols, InvertedIndex) ? "col != $(cols.skip)" : "col == $(cols)")") =
        DataScience.ApplyFn[x::DataFrame->DataFrames.select(x, cols; copycols=false)](ref...)

    classifiertrain(model::Val{:ExtraTreesClassifier}, options::Dict, trainxs::RuleDSL.NodeRef, trainy::RuleDSL.NodeRef; label="$model train")=DataScience.PyTrain["sklearn.ensemble.ExtraTreesClassifier", options](trainxs..., trainy...)
    classify(train_data::RuleDSL.NodeRef, target::Symbol, model::Val, options::Dict, testx::RuleDSL.NodeRef; label="$model inference")=begin
        train_data_X = RuleDSL.@ref ml.select(train_data, Not(target))
        train_data_y = RuleDSL.@ref ml.select(train_data, target)
        trained = RuleDSL.@ref ml.classifiertrain(model, options,  train_data_X, train_data_y )
        DataScience.PyPredict(trained..., testx...)
    end

    classifyprob(train_data::RuleDSL.NodeRef, target::Symbol, model::Val, options::Dict, test_data::RuleDSL.NodeRef; label="prob")=begin
        testx = RuleDSL.@ref ml.select(test_data, Not(target))
        DataScience.ApplyFn[x::DataFrame->DataFrames.select(x, :proba; copycols=false)](classify(train_data, target, model, options, testx))
    end

    score(realized::RuleDSL.NodeRef, probs::RuleDSL.NodeRef)=DataScience.PyScore(realized..., probs...)

    downsample(raw::RuleDSL.NodeRef, ycol::Symbol, frac::Float64)=DataScience.ApplyFn[Main.downsample, ycol, frac](raw...)

    bagpred(test::RuleDSL.NodeRef, model::Val, options::Dict, train_batches::Vector{RuleDSL.NodeRef}, target::Symbol) =
        DataScience.ApplyFn[dfmean](RuleDSL.@ref((ml.classifyprob(b, target, model, options, test) for b = train_batches))...)

    batchpred(test::RuleDSL.NodeRef, model::Val, options::Dict, train_batches::Vector{RuleDSL.NodeRef}, target::Symbol) =
        DataScience.ApplyFn[(ind, prob)->[hash(test) => hcat(ind, prob)]](select(test, target), bagpred(test, model, options, train_batches, target))
end;

we use existing rules in the ds namespace to read CSV files from a shared drive

train_data_file = joinpath(@__DIR__, "../data/train_fraud.csv")
test_data_file = joinpath(@__DIR__, "../data/test_fraud.csv")
train_data = RuleDSL.@ref ds.csvsrc(train_data_file, true; label="train data")
test_data  = RuleDSL.@ref ds.csvsrc(test_data_file, true; label="test data")

target = :isFraud
model = Val(:ExtraTreesClassifier)
options = Dict(:n_estimators => 10, :min_samples_leaf => 10)

test_data_y = RuleDSL.@ref ml.select(test_data, target)

sampleratio = 0.05
train_ddf = DataScience.DDataFrame(train_data_file, blocksize="5 MB")
train_batches = train_ddf.chunks
down_batches = RuleDSL.@ref(ml.downsample(b, target, sampleratio) for b in train_batches)

test_ddf = DataScience.DDataFrame(test_data_file, blocksize="2.5 MB")
test_batches = test_ddf.chunks

mapper = RuleDSL.@ref ml.batchpred(model, options, down_batches, target)
shuffler = RuleDSL.@ref mr.shuffler(first, 4)
reducer = RuleDSL.@ref mr.reducer(vcat)
mr:reducer/typeof(vcat):2837

The model now runs with good results on a cluster for model development, as shown below:

using GraphEngine: RuleDSL, GraphVM

config = RuleDSL.newconfig(RuleDSL.Config(), :project => "MapReduce")
balancer = GraphVM.GlobalUnique()
my_domain = "localhost"
remoteport = GraphVM.drawdataport();
gs0 = GraphVM.RemoteGraphProxy(my_domain => 7225)
GraphVM.@rpc GraphVM.startlocalmasterservice(gs0, remoteport, 4)
gs = GraphVM.RemoteGraphProxy(config, my_domain => remoteport, balancer, GraphVM.GenericData())
GraphVM.wait4clusterinit(gs)
Dict{UInt64, Pair{Float64, GraphEngine.GraphVM.WorkerStatus}} with 4 entries:
  0x75736db10e011223 => 1.65314e9=>Ready
  0x659484196e436694 => 1.65314e9=>Ready
  0x1a2f62ba5778bbef => 1.65314e9=>Ready
  0xa5261ecb6d6604fb => 1.65314e9=>Ready
GraphVM.@remote_eval gs begin
    using GraphEngine: RuleDSL, GraphVM
    using AtomExt, GraphIO
    using DataFrames, DataScience, StatsBase, Random
end

GraphVM.waitcheckstatus(gs, RuleDSL.getconfig(config, :project));

GraphVM.@remote_eval gs $newfunctions
GraphVM.waitcheckstatus(gs, RuleDSL.getconfig(config, :project));

GraphVM.@addrules gs ml $newrules

mrpred = RuleDSL.@ref mr.mapreduce(test_batches, mapper, shuffler, reducer, Main.valcat)
mrscore = RuleDSL.@ref ml.score(RuleDSL.@ref(ml.select(mrpred, :isFraud)), RuleDSL.@ref(ml.select(mrpred, :proba)))

alljobs, ds = RuleDSL.jobdeps(config, [mrscore], Set([:classifiertrain, :splitbykey, :reducer]));

GraphVM.waitcheckstatus(gs, RuleDSL.getconfig(config, :project));
GraphVM.initgraph!(gs)
GraphVM.dispatchjobs!(gs, alljobs; nocopy=Set([:splitbykey]));
using GraphIO

GraphVM.waitcheckstatus(gs, RuleDSL.getconfig(config, :project));
svg = GraphIO.postremotegraph(gs, remoteport);
GraphIO.postsvg(svg, "ml_persist_1.svg")
GraphVM.@rpc GraphVM.endcluster(gs);

Figure 1 - Original Distributed ML Pipeline.

2. Record a ML Experiment

Now that the data scientists are happy with the results, they will want to persist the experiment so that it can be reproduced later. They have made a number of choices in this ML model run including data sources, configurations, choice of model, and hyper parameters etc. Some experiment tracking tools are based on saving notebooks, but that is not an adequate and reliable solution, as some data or variables were not captured by the code in the notebook. For example, the developer might have read data from a local file, or rely upon the state or data of a remote server. Under those circumstances, just saving the notebook is not adequate to recover the ML run.

Julius takes a different approach. Instead of saving the notebook on the client side, we persist the entire state on the server side. This server side persisting process is easy and seamless. We first start a fresh virtual cluster at a new port, using the same docker image used by the development environment.

remoteport2 = GraphVM.drawdataport()

GraphVM.@rpc GraphVM.startlocalmasterservice(gs0, remoteport2, 4)
gs2 = GraphVM.RemoteGraphProxy(config, my_domain => remoteport2, balancer, GraphVM.GenericData())
GraphVM.wait4clusterinit(gs2)
Dict{UInt64, Pair{Float64, GraphEngine.GraphVM.WorkerStatus}} with 4 entries:
  0x2720954ff15b348e => 1.65314e9=>Ready
  0x23006862e91b5408 => 1.65314e9=>Ready
  0xa1de60fa66d6b90d => 1.65314e9=>Ready
  0x53902d08ca3a919f => 1.65314e9=>Ready

The following line enables recording on this new cluster, all the subsequent actions will be recorded on the server side.

GraphVM.@rpc GraphVM.clearrecording!(gs2)
GraphVM.@rpc GraphVM.setrecording!(gs2, true);

We now re-run the same ML model on this fresh server, where the existing local variables can be re-used to recreate the same data/analytics pipeline on the server. Only a few lines of codes are needed for server side recording, as shown below:

GraphVM.@remote_eval gs2 begin
    using GraphEngine: RuleDSL, GraphVM
    using AtomExt, GraphIO
    using DataFrames, DataScience, StatsBase, Random
end

GraphVM.waitcheckstatus(gs2, RuleDSL.getconfig(config, :project));

GraphVM.@remote_eval gs2 $newfunctions
GraphVM.waitcheckstatus(gs2, RuleDSL.getconfig(config, :project));

GraphVM.@addrules gs2 ml $newrules

GraphVM.waitcheckstatus(gs2, RuleDSL.getconfig(config, :project));
GraphVM.initgraph!(gs2)
GraphVM.dispatchjobs!(gs2, alljobs; nocopy=Set([:splitbykey]));

Now we can retrieve the recording saved on the server side for this ML run. This an extremely compact representation of the entire run, including all the data and analytical logic to recreate the distributed pipeline. This recording can be persisted on long term storage like AWS S3. The version of the docker container being used can also be persisted along with the recording. The docker container captures the exact and complete run time environment.

GraphVM.waitcheckstatus(gs2, RuleDSL.getconfig(config, :project));
records = GraphVM.@rpc GraphVM.getrecording(gs2);

# terminate the recording cluster
GraphVM.@rpc GraphVM.endcluster(gs2);

3. Reproduce a ML Experiment

From the recording, the entire distributed pipeline can be easily recreated at a later time. To do so, we first spin up a fresh cluster using the same version of docker container, and then replay the recording on this new server. It only takes a single line of code to recover the stored pipeline:

remoteport3 = GraphVM.drawdataport()
GraphVM.@rpc GraphVM.startlocalmasterservice(gs0, remoteport3, 4)
gs3 = GraphVM.RemoteGraphProxy(config, my_domain => remoteport3, balancer, GraphVM.GenericData())
GraphVM.wait4clusterinit(gs3)
Dict{UInt64, Pair{Float64, GraphEngine.GraphVM.WorkerStatus}} with 4 entries:
  0x6ad69616f8c722df => 1.65314e9=>Ready
  0x01ffa25ca7192a74 => 1.65314e9=>Ready
  0xee623a327fde6c10 => 1.65314e9=>Ready
  0xef901ec8023a14fb => 1.65314e9=>Ready
GraphVM.@rpc GraphVM.replayrecording(gs3, records)
11

Now the results are ready to be inspected:

GraphVM.waitcheckstatus(gs3, RuleDSL.getconfig(config, :project));
svg = GraphIO.postremotegraph(gs3, remoteport3);
GraphVM.@rpc GraphVM.endcluster(gs3)
GraphIO.postsvg(svg, "ml_persist_2.svg")

Figure 2 - Recreate a Distributed ML Pipeline.


This page was generated using Literate.jl.