|
|
Subscribe / Log in / New account

Concurrency in Julia

November 9, 2021

This article was contributed by Lee Phillips

The Julia programming language has its roots in high-performance scientific computing, so it is no surprise that it has facilities for concurrent processing. Those features are not well-known outside of the Julia community, though, so it is interesting to see the different types of parallel and concurrent computation that the language supports. In addition, the upcoming release of Julia version 1.7 brings an improvement to the language's concurrent-computation palette, in the form of "task migration".

Multithreading

Julia supports a variety of styles of concurrent computation. A multithreaded computation is a type of concurrency that involves simultaneous work on different processing units. Multithreading, and the related term parallel computation, usually imply that the various threads have access to the same main memory; therefore it involves multiple cores or processing units on a single computer.

When Julia is started without the -t flag it will use only one thread. To allow it to use all of the available hardware threads, it accepts the -t auto argument. This allows access to all of the logical threads on the machine. This is often not optimal, and a better choice can be -t n, where n is the number of physical cores. For example, the popular Intel Core processors provide two logical cores for each physical core, doubling up using a technique called hyperthreading, which can yield anything from a modest speedup to an actual slowdown, depending on the type of calculation.

To demonstrate multithreading in Julia, we will use the function below that tests whether a number is prime. As with all of the examples in the article, isprime() leaves many potential optimizations on the table in the interests of simplicity.

    function isprime(x::Int)
        if x < 2
            return false
        end
        u = isqrt(x) # integer square root
        for n in 2:u
            if x % n == 0
                return false
            end
        end
        return true
    end

    function isprime(x::Any)
        return "Integers only, please"
    end

The function should look only at integers (Int is an alias for Int64), so we've taken advantage of multiple dispatch to create two methods: one that tests for primality and one that complains if we give it anything besides an integer. The most specific method is always dispatched, so this lets us handle incorrect data types without writing explicit tests in the function.

A useful package for multithreading that performed well in my tests is Folds. This is not in the standard library so you must install it using Julia's package manager. The Folds package provides a small collection of high-level macros and functions for concurrency. The name derives from the use of "fold" to mean a reduction over an array. Folds provides multithreaded versions of map(), sum(), maximum(), minimum(), reduce(), collect(), and a handful of other automatically parallelized operations over collections.

Here we use map() to check a range of numbers for primality. If f() is a function of one variable and A is an array, the expression map(f, A) applies f() to each element of A, returning an array with the same shape as A. Replacing a map() operation with a multithreaded one is as easy as this:

    julia> @btime map(isprime, 1000:9999999);
      8.631 s (2 allocations: 9.54 MiB)

    julia> @btime Folds.map(isprime, 1000:9999999);
      5.406 s (45 allocations: 29.48 MiB)

In Julia, the syntax a:n:b defines an iterator called a "range" that steps from a to b by n (which defaults to 1). For the map() call, those ranges get turned into a vector.

Folds.map() divides the array into equal segments, one for each available thread, and performs the map() operation in parallel over each segment. As with all the experiments in this article, the timings were performed using Julia version 1.7rc1. For this test I started the REPL with julia -t2.

The @btime macro, from the BenchmarkTools package, runs the job several times and reports an average run time. The timings indicate a speedup not all that far from the ideal factor of two, on my two-core machine, at the cost of higher memory consumption. A CPU-usage meter confirms that only one core was working during the first calculation, while two were busy during the second.

Another form of multithreading is use of graphics processing units (GPUs). Originally designed to accelerate the inherently parallel graphics calculations in 3D games and other graphics-heavy applications, GPUs, with their hundreds or thousands of floating-point processors, are now used as array coprocessors for a wide variety of applications. The best candidates for GPU computing are inherently parallel calculations with a large ratio of arithmetic to memory operations. There is an organization called JuliaGPU that serves as an umbrella for Julia packages that implement or rely on this style of parallel processing.

Map operations are inherently parallel; they involve a function acting on each element of a collection in isolation, neither reading from nor writing to other elements. Because of this, conventional maps can be replaced with multithreaded versions without having to change anything else in the program. The complications that come with simultaneous access to the same data don't arise.

Real-life programs will contain parts that can be easily parallelized, others where greater care must be taken, and parts that can not be parallelized at all. Consider a simplistic simulation of a galaxy, with bodies moving under the influence of their mutual gravitational forces. The force on each body must be calculated by looking at the positions of all the others (out to some distance, depending on the accuracy sought); although this part of the calculation can be parallelized, the pattern is more complicated than the map() patterns considered above. But after all the forces are stored, each body's change in velocity can be calculated independently of the others, in an easily parallelizable procedure with the structure of a simple map().

Julia has all the facilities for the more complex forms of concurrent computation. A for loop can be made multithreaded using the @threads macro this way:

    Threads.@threads for i = 1:N
	<do something>        
    end

In this case, if, for example, we have two threads, one thread will get the loop from 1 to N/2 and the other from N/2 to N. If more than one thread needs access to the same data, that data must be protected by locking, which is provided by the lock() function. Julia prevents some race conditions through the use of an Atomic data type. Programs are not allowed to change data of this type except through a set of atomic operations such as Threads.atomic_add!() and Threads.atomic_sub!() instead of normal addition and subtraction.

Distributed processing

A different style of concurrency involves the cooperatively multitasking asynchronous coroutines that Julia calls Tasks. These coroutines can share a single thread, be distributed among threads, or be assigned to different nodes in a cluster or to geographically distributed machines communicating over the internet. All of these forms of distributed computing share the characteristic that processors do not have direct access to the same memory, so the data must be transported to the places where computation is to happen.

Tasks are well suited to the situation where a program needs to start a process that may take a while, but the program can continue because it doesn't need the result immediately. Examples are downloading data from the internet, copying a file, or performing a long-running calculation whose result is not needed until later.

When Julia is invoked with the -p n flag, n worker processes are initialized, in addition to the main executive process. The two multiprocessing flags can be combined, as in julia -p2 -t2. This would start up with two worker processes, each of which can compute with two threads. For the examples in this section, I used julia -p2.

The "manual" way to launch a task is by using the macro call @spawnat :any expr. This assigns a task to compute the expression expr on a worker process chosen by the system. The worker processes are identified by integers starting with 1; using an integer in place of :any spawns the task in that specific process.

[Leibniz sum equation]

More common is the use of a distributed version of map() called pmap(), which gets imported by the -p flag. pmap() performs the same calculation, but first breaks the array into pieces, sending an equally-sized piece to each worker process where the maps are performed in parallel, then gathers and assembles the results. For an example calculation we'll estimate π using the notoriously slowly converging Leibniz sum (seen at right).

The function leibniz(first, last, nterms) returns an array with the results for every nterms terms between the first two arguments:

    @everywhere function leibniz(first, last, nterms)
        r = [] 
        for i in first:nterms:last # Step from first to last by nterms
            i1 = i
            i2 = i+nterms-1
            # Append the next partial sum to r:
            push!(r, (i1, i2, 4*sum((-1)^(n+1) * 1/(2n-1) for n in i1:i2)))
        end  
        return r  
    end

The @everywhere macro, also imported by the -p flag, broadcasts function and variable definitions to all workers, which is necessary because the worker processes do not share memory with the main process. The leibniz() function is a direct translation of the Leibniz sum, using the sum() function to add up the terms in a generator expression. After every group of nterms terms, it pushes the cumulative partial result into the r array, which it returns when done.

In order to test the performance of pmap(), we can use it to simultaneously calculate the first 2 × 108 terms:

    julia> @btime r = pmap(n -> leibniz(n, n+10^8-1, 1000), [1, 10^8+1]);
      7.127 s (1600234 allocations: 42.21 MiB)

The version using the normal map, which uses a single process, takes almost twice as long (but uses 23% of the memory):

    julia> @btime r = map(n -> leibniz(n, n+10^8-1, 1000), [1, 10^8+1]);
      13.630 s (200024 allocations: 9.76 MiB)

The figure below shows the results of summing the first 2 × 108 terms of the Leibniz series.

[Leibniz sum convergence]

The distributed version consumed significantly more memory than the normal map(), something that I observed in most cases. pmap() seems best suited to coarse-grained multiprocessing; it's an effective way to speed up an expensive operation over a small collection of inputs.

Julia's machinery for distributed processing also works across machines. When given a file of internet addresses, Julia will spawn jobs over the internet using SSH and public-key authentication, allowing distributed code developed locally to run with little modification on a network of suitably equipped computers.

Tasks and task migration

Another type of concurrency that Julia supports can be described as asynchronous multithreading. It computes with Julia tasks, but assigned to multiple threads within a single process. Therefore all tasks can access the same memory and cooperatively multitask. For the examples in this section I started Julia with julia -t2, so I had one thread for each CPU core.

We can start a task, and automatically schedule it on an available thread, with a macro: a = Threads.@spawn expr. This returns a Task object; we can wait for the Task and read the return value of expr with fetch(a).

For calculations involving a set of tasks with roughly equal run times, we should expect a close-to-linear speedup with respect to the number of CPU threads. In previous versions of Julia, once a task was scheduled on a thread, it was stuck there until it finished. This could create scheduling problems in cases where some tasks take longer to run than others; the threads hosting the lighter tasks may become idle while the heavy tasks continue to compete for time on their original threads. In such cases involving tasks with very unequal run times, we might no longer observe linear speedups; there are resources going to waste.

In version 1.7 of Julia, tasks are allowed to migrate among threads. Tasks can be rescheduled to available threads rather than remaining stuck where they were originally assigned. This feature is still largely undocumented, but I verified that tasks launched with Threads.@spawn can migrate upon calling yield(), which is the function that suspends a task and allows another scheduled task to run.

In order to directly observe the effect of thread migration, I needed to be able to turn it on and off. It's possible to compile a version of Julia without the feature enabled, but there is an easier way. The ThreadPools package provides a set of useful macros for concurrency. I used its @tspawnat macro in my experiments. Tasks spawned with this macro can migrate, just as tasks spawned using Threads.@spawn, but using @tspawnat allowed me to easily make a modified version that spawns tasks that are not allowed to migrate.

Monte Carlo physics simulations or probabilistic modeling are ideal types of calculations for asynchronous multithreading. This approach involves running a set of simulations that are identical aside from the series of pseudo-random numbers that control the details of each task's calculation. All the tasks are independent of each other, and when they are complete, various averages and their distributions can be examined.

For my experiments I wrote a simple program to look at the statistical properties of Julia's pseudo-random number generation function rand(), which returns a uniformly distributed random number between 0 and 1 when called with no arguments. The function below, when passed a number n, generates a random number n times, reporting the mean for each group of n/10 numbers:

    function darts(n)
          if n % 10 != 0 || n <= 0
              return nothing
          end
        a = Threads.threadid()
        means = zeros(10)
        for cyc in 1:10
            yield()
            s = 0
            for i in 1:Int(n/10)
                s += rand()
            end
            means[cyc] = s/(n/10)
        end
        return (a, Threads.threadid(), time() - t0, n, means)
    end

The program only works with an n that is a multiple of 10. It has a few additional lines for keeping track of which thread it ran on and to record the completion time, which it calculates as an elapsed time from a global t0. Before each one of the 10 cycles, it calls yield(), which allows the scheduler to switch to another task on the thread, and, with thread migration, to move the task to another thread.

As an example, if we run darts(1000), we'll get 10 mean values back, each one the result of picking 100 random numbers. According to the central limit theorem these mean values should have a normal distribution with a mean that is the same as that of the underlying (uniform) distribution.

In order to get a sample and plot its distribution, I launched 20 tasks, each running the darts() program with n = 107. This ran 200 random trials, each one picking one million random numbers, then calculating and recording their mean. We also need a highly accurate estimate of the distribution's mean, so I launched one additional task with n = 109 to calculate this. We know that the theoretical mean should be 0.5, so this will check for bias in rand(). With a trivial modification the procedure can be used with other underlying distributions whose means we may not know analytically.

I launched the tasks with the following:

    begin
        t0 = time()
        tsm = [@tspawnat rand(1:2) darts(n) for n in [repeat([1e7], 20); 1e9]]
    end

All of the tasks have access to t0, so they can all report the time at which they finished. The call rand(1:2) assigns each task initially to one thread or the other with equal probability. I repeated this experiment five times, and ran another experiment five times, identical except for the use of my modified @tspawnat macro that disables thread migration.

To look at the distribution from any one of the experiments we can collect the sample means into one array and plot it with the histogram() function that comes with the Plots package; an overview of the plotting system explains how to use histogram() and the other plotting functions used in this article. The result is shown in the next figure, with a bar superimposed showing the mean from the task using 109 numbers.

[Random number distribution]

The distribution is approximately normal with the correct mean.

The experiment consists of 20 short tasks and one task that takes 100 times as long. I was optimistic that this would exhibit the effects of thread migration, as the scheduler would have the opportunity to move tasks off of the thread containing the long-running calculation. This is the type of situation that thread migration was intended to improve. Calculations involving a group of tasks with similar resource requirements would not be expected to benefit from migration.

The average of the 21 times for completion, over five experiments, with thread migration enabled was 0.71 seconds, and 1.5 seconds with migration disabled. A detailed look at the timings shows how thread migration allows the job to complete twice as quickly. The next figure shows the return times for all 10 experiments, with a line drawn to guide the eye through the times for the fastest one.

[Task migration plot]

A separate timing showed that single runs take 0.024 seconds for darts(1e7) and 2.4 seconds for darts(1e9), the ratio of run times that we would expect. Trial number 21 is the task calling darts(1e9), which takes about 2.5 seconds when it is run in parallel. The plot makes it clear that, without migration, about half the tasks get stuck sharing the thread with the single heavy task. The other half return quickly, after which an available thread goes to waste. In contrast, thread migration moves tasks onto the free thread, leading to better load balancing and a quicker overall completion. Looking at the values returned by the Threads.threadid() calls confirms that about half the tasks migrate.

This experiment shows the new thread migration feature is working as intended. It can speed up asynchronous multithreaded calculations in cases where tasks have significantly unequal run times, or where some tasks block waiting for I/O or other events.

In calculations like the ones in this and the previous section, the tasks proceeded completely independently of each other; no coordination was required. If a program has locations where it must wait for all the asynchronous tasks to complete before moving on to the next stage, it can use the @sync macro. This will wait until all tasks spawned in the scope where the macro appears have finished.

Conclusion

Concurrency is inherently difficult. Concurrent programs are harder to debug and to reason about. Julia doesn't erase this difficulty; the programmer still needs to beware of race conditions, consistency, and correctness. But Julia's macros and functions for tasks and data parallelism make simple things easy in many cases, and more complex problems approachable.

This is also an area where development is active, from packages such as Folds to implementation details like the operation of the scheduler, which has most recently led to the task migration feature. We can expect Julia's concurrency situation to continue to improve. We can also expect some of the best parts of the concurrency ecosystem to remain outside of the standard library and the official documentation; the programmer who wants to take the best advantage of these offerings will need to make an effort to keep up with the changing landscape.


Index entries for this article
GuestArticlesPhillips, Lee


to post comments

Concurrency in Julia

Posted Nov 10, 2021 4:30 UTC (Wed) by rsidd (subscriber, #2582) [Link] (8 responses)

Thanks for all the articles on Julia. Interesting point on using multiple dispatch in Julia to check for invalid data types without polluting your main function with checks. But shouldn't the type-check function throw an exception, rather than return a string value? It's equally short and better programming practice I'd think, and wouldn't confuse LWN readers. A minor and tangential matter, but a bit of a turn-off when it's right at the top of the article.

Concurrency in Julia

Posted Nov 10, 2021 5:24 UTC (Wed) by leephillips (subscriber, #100450) [Link]

I suppose throwing an exception is a better idea. But then I would have to explain the syntax for that, and my version does what I want.

Concurrency in Julia

Posted Nov 10, 2021 14:57 UTC (Wed) by garrison (subscriber, #39220) [Link] (6 responses)

I think the version in the article is great in that is succinctly illustrates the important point to a reader potentially unfamiliar with Julia. If we want to talk about idiomatic Julia, however, instead of defining a second isprime method that throws an exception, one should simply get rid of the second method. When no method is given with compatible types, a MethodError will be thrown, and this is the best way to communicate that a method has not been implemented for the given type(s).

Another comment about making isprime more idiomatic: instead of specializing on Int (which, as the article notes, is a synonym for Int64), it could specialize on Integer instead. Integer is an abstract type that includes all integers, so with this change, the function would still work if passed e.g. an Int32. (The JIT will automatically compile specialized machine code for each case encountered.)

Concurrency in Julia

Posted Nov 10, 2021 16:06 UTC (Wed) by leephillips (subscriber, #100450) [Link]

All relevant observations! The Integer type would also include BigInt, so you could supply as big a prime candidate as you wanted.

Concurrency in Julia

Posted Nov 10, 2021 16:08 UTC (Wed) by droundy (subscriber, #4559) [Link] (3 responses)

Wouldn't using smaller int types simply make overflow almost inevitable? An Int version of factorial is bad enough in terms of uselessness.

Concurrency in Julia

Posted Nov 10, 2021 22:28 UTC (Wed) by mathstuf (subscriber, #69389) [Link] (2 responses)

What `isprime` algorithm requires values larger than the queried number (assuming non-negative values)?

Concurrency in Julia

Posted Nov 12, 2021 4:34 UTC (Fri) by droundy (subscriber, #4559) [Link] (1 responses)

My brain somehow converted it to a factorial program. Not sure how that happened.

Concurrency in Julia

Posted Nov 12, 2021 6:29 UTC (Fri) by rsidd (subscriber, #2582) [Link]

Even for factorial, the parameter type can be anything, the function body can convert it to bigint if needed.
julia> function factorial(n::Integer)
           return prod(1:convert(BigInt,n))
       end
factorial (generic function with 1 method)

julia> factorial(convert(Int32,100))
93326215443944152681699238856266700490715968264381621468592963895217599993229915608941463976156518286253697920827223758251185210916864000000000000000000000000

Concurrency in Julia

Posted Nov 11, 2021 5:36 UTC (Thu) by rsidd (subscriber, #2582) [Link]

I'm only an occasional user of Julia but should have thought of that point, that if you don't define the second function a method exception will be thrown anyway for wrong type of argument. But leaving it out would confuse readers...

Concurrency in Julia

Posted Nov 10, 2021 15:39 UTC (Wed) by xecycle (subscriber, #140261) [Link]

Threads in Julia indeed was a part that confused me a lot when getting started. Wow I couldn’t find a primitive to start a new OS thread? Either I can use the auto-parallelize tools feeling very similar to OpenMP, or I can go with M:N user threads.

Now I think I have a better grasp of the whole picture and no longer worry about that. I used to think of Julia as “a high-performance general-purpose langauge with sugar for numeric computation”, but that view doesn’t fit well. Perhaps it is better viewed as “a high-performance numeric computing DSL with abilities to do some general-purpose programming”. The community measures and improves performance of Julia based on FLOPS numbers, instead of e.g. packets/s or 99.9% response time; this is not the tool for me if I wrap up a flat-combining data structure every other day, but it’s an excellent choice for crunching terabytes of tabular data, millions-by-millions sparse matrices, etc. Its bias towards OLAP made it a great tool, but also made some scenarios harder. The term “high performance” need some qualifications, but well, OLTP and OLAP community never agreed with each other on this topic…

Concurrency in Julia

Posted Nov 12, 2021 20:24 UTC (Fri) by ballombe (subscriber, #9523) [Link] (3 responses)

> This allows access to all of the logical threads on the machine. This is often not optimal, and a better choice can be -t n, where n is the number of physical cores.

This is true, and this is not a Julia-specific issue. However, OS make it much easier to query the number of logical threads than of physical threads.

Concurrency in Julia

Posted Nov 12, 2021 21:24 UTC (Fri) by leephillips (subscriber, #100450) [Link] (2 responses)

That’s true, which is why I thought it worth mentioning the lscpu command, which will tell you the truth about your CPU.

Concurrency in Julia

Posted Nov 12, 2021 21:30 UTC (Fri) by jake (editor, #205) [Link] (1 responses)

> That’s true, which is why I thought it worth mentioning the lscpu
> command, which will tell you the truth about your CPU.

some darned editor may have dropped that line on the editing room floor ... oops :)

jake

Concurrency in Julia

Posted Nov 12, 2021 21:46 UTC (Fri) by leephillips (subscriber, #100450) [Link]

The oops is mine—I didn’t realize that was cut. But for those with the wisdom to read the comments, there it is. It’s a shell command, nothing to do with Julia.


Copyright © 2021, Eklektix, Inc.
Comments and public postings are copyrighted by their creators.
Linux is a registered trademark of Linus Torvalds