-
Notifications
You must be signed in to change notification settings - Fork 46
Open
Description
Executing the "Example: Running an iterative algorithm at scale with incremental notifications" at http://mbrace.io/starterkit/HandsOnTutorial.FSharp/examples/200-kmeans-clustering-example.html:
#load "ThespianCluster.fsx"
#load @"lib\utils.fsx"
#load @"lib\sieve.fsx"
open System
open MBrace.Core
open MBrace.Thespian
open Nessos.Streams
let cluster =
ThespianCluster.InitOnCurrentMachine( workerCount = Environment.ProcessorCount,
logger = ConsoleLogger(),
logLevel = LogLevel.Info )
let dim = 2 // point dimensions: we use 2 dimensions so we can chart the results
let numCentroids = 5 // The number of centroids to find
let partitions = 12 // The number of point partitions
let pointsPerPartition = 50000 // The number of points per partition
let epsilon = 0.1
/// Represents a multi-dimensional point.
type Point = float[]
/// Generates a set of points via a random walk from the origin, using provided seed.
let generatePoints dim numPoints seed : Point[] =
let rand = Random(seed * 2003 + 22)
let prev = Array.zeroCreate dim
let nextPoint () =
let arr = Array.zeroCreate dim
for i = 0 to dim - 1 do
arr.[i] <- prev.[i] + rand.NextDouble() * 40.0 - 20.0
prev.[i] <- arr.[i]
arr
[| for i in 1 .. numPoints -> nextPoint() |]
let randPoints = Array.init partitions (generatePoints dim pointsPerPartition)
let point2d (p:Point) = p.[0], p.[1]
[<AutoOpen>]
module KMeansHelpers =
/// Calculates the distance between two points.
let dist (p1 : Point) (p2 : Point) =
Array.fold2 (fun acc e1 e2 -> acc + pown (e1 - e2) 2) 0.0 p1 p2
/// Assigns a point to the correct centroid, and returns the index of that centroid.
let findCentroid (p: Point) (centroids: Point[]) : int =
let mutable mini = 0
let mutable min = Double.PositiveInfinity
for i = 0 to centroids.Length - 1 do
let dist = dist p centroids.[i]
if dist < min then
min <- dist
mini <- i
mini
/// Given a set of points, calculates the number of points assigned to each centroid.
let kmeansLocal (points : Point[]) (centroids : Point[]) : (int * (int * Point))[] =
let lens = Array.zeroCreate centroids.Length
let sums =
Array.init centroids.Length (fun _ -> Array.zeroCreate centroids.[0].Length)
for point in points do
let cent = findCentroid point centroids
lens.[cent] <- lens.[cent] + 1
for i = 0 to point.Length - 1 do
sums.[cent].[i] <- sums.[cent].[i] + point.[i]
Array.init centroids.Length (fun i -> (i, (lens.[i], sums.[i])))
/// Sums a collection of points
let sumPoints (pointArr : Point []) dim : Point =
let sum = Array.zeroCreate dim
for p in pointArr do
for i = 0 to dim - 1 do
sum.[i] <- sum.[i] + p.[i]
sum
/// Scalar division of a point
let divPoint (point : Point) (x : float) : Point =
Array.map (fun p -> p / x) point
let rec KMeansCloudIterate (partitionedPoints, epsilon, centroids, iteration, emit) = cloud {
// Stage 1: map computations to each worker per point partition
let! clusterParts =
partitionedPoints
|> Array.map (fun (p:CloudArray<_>, w) -> cloud { return kmeansLocal p.Value centroids }, w)
|> Cloud.ParallelOnSpecificWorkers
// Stage 2: reduce computations to obtain the new centroids
let dim = centroids.[0].Length
let newCentroids =
clusterParts
|> Array.concat
|> ParStream.ofArray
|> ParStream.groupBy fst
|> ParStream.sortBy fst
|> ParStream.map snd
|> ParStream.map (fun clp -> clp |> Seq.map snd |> Seq.toArray |> Array.unzip)
|> ParStream.map (fun (ns,points) -> Array.sum ns, sumPoints points dim)
|> ParStream.map (fun (n, sum) -> divPoint sum (float n))
|> ParStream.toArray
// Stage 3: check convergence and decide whether to continue iteration
let diff = Array.map2 dist newCentroids centroids |> Array.max
do! Cloud.Logf "KMeans: iteration [#%d], diff %A with centroids /n%A" iteration diff centroids
// emit an observation
emit(DateTimeOffset.UtcNow,iteration,diff,centroids)
if diff < epsilon then
return newCentroids
else
return! KMeansCloudIterate (partitionedPoints, epsilon, newCentroids, iteration+1, emit)
}
let KMeansCloud(points, numCentroids, epsilon, emit) = cloud {
let initCentroids = points |> Seq.concat |> Seq.take numCentroids |> Seq.toArray
let! workers = Cloud.GetAvailableWorkers()
do! Cloud.Logf "KMeans: persisting partitioned point data to store."
// Divide the points
let! partitionedPoints =
points
|> Seq.mapi (fun i p ->
local {
// always schedule the same subset of points to the same worker
// for caching performance gains
let! ca = CloudValue.NewArray(p, StorageLevel.MemoryAndDisk)
return ca, workers.[i % workers.Length] })
|> Local.Parallel
do! Cloud.Logf "KMeans: persist completed, starting iteration."
return! KMeansCloudIterate(partitionedPoints, epsilon, initCentroids, 1, emit)
}
let kmeansTask =
KMeansCloud(randPoints, numCentroids, epsilon*10000.0, ignore)
|> cluster.Run
kmeansTask.Result
yields:
INFO : Uploading 'MBrace.Flow, Version=1.5.4.0, Culture=neutral, PublicKeyToken=null' [IMG 392.50 KiB, PDB 0.51 MiB]
INFO : Uploading 'Streams, Version=0.4.1.0, Culture=neutral, PublicKeyToken=null' [IMG 338.00 KiB, PDB 421.50 KiB]
INFO : Uploading 'FSI-ASSEMBLY_d731fef8-8866-4a4a-9eb4-8e10af24184f_1, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null' [IMG 58.00 KiB]
INFO : Uploading data dependency 'Double[][][] randPoints@' [8.82 MiB]
WARNING : Could not serialize data dependencies: MBrace.Thespian.ThespianCluster cluster@
INFO : Posted CloudProcess<float [] []> '7a5bbe93-7e2b-4944-b989-371111226649'.
val kmeansTask : MBrace.Runtime.CloudProcess<Point []>
System.NullReferenceException: Object reference not set to an instance of an object.
at System.Tuple`2.get_Item2()
at [email protected](Tuple`2 tupledArg)
at [email protected](Tuple`2 value)
at Nessos.Streams.Stream.bulk@174[T](T[] source, FSharpFunc`2 iterf, StreamCancellationTokenSource cts, FSharpFunc`2 complete, Unit unitVar0)
at Nessos.Streams.Stream.toArray[T](Stream`1 stream)
at [email protected](Tuple`2[][] _arg1)
at [email protected](ExecutionContext ctx, T t) in C:\Users\eirik.tsarpalis\devel\mbrace\MBrace.Core\src\MBrace.Core\Continuation\Builders.fs:line 337
--- End of stack trace from previous location where exception was thrown ---
at <StartupCode$MBrace-Runtime>[email protected](CloudProcessResult _arg2) in C:\Users\eirik.tsarpalis\devel\mbrace\MBrace.Core\src\MBrace.Runtime\Runtime\CloudProcess.fs:line 211
at [email protected](a a)
at MBrace.Core.Internals.AsyncExtensions.Async.RunSync[T](FSharpAsync`1 workflow, FSharpOption`1 cancellationToken) in C:\Users\eirik.tsarpalis\devel\mbrace\MBrace.Core\src\MBrace.Core\Utils\AsyncExtensions.fs:line 99
at <StartupCode$FSI_0007>.$FSI_0007.main@()
Stopped due to error
Metadata
Metadata
Assignees
Labels
No labels