RxJava is a NetFlix open source library that they developed as part of optimizing their architecture. The library is related to the “Reactive programming” pattern:
“RxJava is an implementation of Reactive Extensions – a library for composing asynchronous and event-based programs using observable sequences for the Java VM”.
Dec 26, 2015: Read an article on ribot’s site, “Unit Testing RxJava Observables“, that RxJava already has experimental support for unit testing assertions.
Approach
I thought I would try my hand at it. I used the ‘Hello world’ example on the RxJava wiki’s getting started page:
public static void hello(String... names) {
Observable.from(names).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("Hello " + s + "!");
}
});
}
As a source of gradual complexity, I used a list of strings for the target output, and I also don’t include the end “!” in the input stream. With this I was able to use more of the library to get a feel for the syntax and how it relates to the underlying concepts.
Learning Reactive
One problem with many introductions to RxJava is that they start off with overly simple or complex examples, and these complex examples require domain knowledge of some application. BTW, the worse example of this tendency was a book on Design Patterns where the author used obscure sports racing concepts and so forth.
Another problem is that the major Operators are very abstractly documented. Just look at the and/then/when or join operators. You’d think each Rx implementation’s documents would give examples for each operator. Update: I found one tutorial that has much more RxJava examples: Intro-To-RxJava
Example
Below I created a JUnit test class in Java that creates an Observable from an array [“Hello”, “world”], and each test subscribes to it. That is, the test is the Observer. The subtle complexity is that the required “!” at the end of result string is not in the String array. How is that added withing the ‘Rx’ pattern? RxJava has a rich API, thus there are many ways to take that array and create the “Hello world!” string.
Concats
Test2 in the source listing 1 is my attempt to avoid ‘programmatic’ addition of the ending “!” to the String result. Instead, I concat three Observables: a concatMap that takes each streamed in String and creates a new Observable with [data, ” “], an Observable that skips the last [data,” “], and finally an Observable that just creates the “!”.
Yes, this is a lot of complex code for this simple task. The point was to use a simple task to learn the complex code. Later, the complexity would be tamed or matched to the task at hand.
How to fail JUnit test
Since I used JUnit tests to write the code, I wondered how would you detect that an RxJava operator got an error and failed the test? For example, during the onCompleted method? An error there will not invoke the onError method since the Observable is done generating any data. Thus, in Test4, I had to wrap the assertion fail in a try catch, then manually invoke the onError method. Probably semantically incorrect approach. But, that still would not make the JUnit test fail, I had to set a field so that the last statements in the test would use that to fail the test. Seems like a kludge.
Dec 26, 2015
Using RxJava’s test support is the way to go for simple assertions. Below I use TestSubscriber.
/**
* Dan Lew's example in 'Grokking RxJava, Part 1: The Basics'.<p>
* Converted to a test.
* @see "http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/"
* @throws Exception
*/
@Test
public final void test6() throws Exception{
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
Observable.just("Hello, world!")
.map(s -> s + " -Dan")
.map(s -> s.hashCode())
.map(i -> Integer.toString(i))
.subscribe(testSubscriber);
// .subscribe(s -> System.out.println(s)); // -
testSubscriber.assertValue("-238955153");
}
I’ll leave most of the sample code as is for now. But, its best to use the library to its fullest before you create your own testing kludge.
Summary
Have not grokked RxJava yet, so the above is probably not idiomatic use or correct.
RxJava looks like a powerful system, but it is very sophisticated and may require a long grok curve. There are various YouTube videos and other presentations that give a contextual understanding of RX in general. Ben Christensen’s presentations are awesome.
The “React” term?
Many things are using some form of this term. For example, ReactJs, the M~V library from FaceBook. Sure, it relates, however the concept of event streams is not central to the library’s conceptual model, afaik.
JavaScript
Perhaps one of the most important use of RX, in terms of usage, is on the client side. One library is RxJS.
Cycle.js
There is even a framework, Cycle.js created by André Staltz, née Medeiros, that combines the “reactive” concept with RxJS to create a new paradigm of client structure called Model View Intent (MVI). Some intro videos on CycleJS library:
“Learning Reactive Programming with Java 8”, Nickolay Tsvetinov; Packt Publishing, June 24, 2015, Web ISBN-13: 978-1-78528-250-8, Print ISBN-13: 978-1-78528-872-2; link
Listing 1, Full Source
Having the source as a JUnit test is helpful since the code can be used as a means to experiment and extend, while having the ability to easily test regressions.
In a prior post “Java Plain Old Concurrent Object” I wrote about a need for a higher level concurrency support in Java. Here I give the program code that I used to experiment with concurrency and learn a little more.
Summary
Presented are a few ways of coding a concurrent program using Java threading and also using a Communicating Sequential Processes (CSP) library. They are written in the Groovy language. Used as an example is a simple lottery number generation application.
In a prior blog post(Java Plain Old Concurrent Object) I wrote about a possible need for a higher level concurrency support in Java. Here I give the program code that I used in September 2007 to learn more then the rudimentary Java concurrency concepts.
I managed to use CSP and the usual Java concurrent library support, and recently was about to try Actors using the new GPars library. It was a good way to get a feel for each approach. Note that it was not enough to become a concurrency expert; that was not a goal; still studying Goetz’s book Java Concurrency In Practice.
My first use of multitasking code was in C++ code I did in 1992:
… technical hurdles with the product, so I put it aside until I could get back to it. One thing that I was proud of was learning a little bit of Fuzzy Logic and using it as the controller. I even wrote a graphical simulator in the C++ language; threading was fun stuff. Watching the fuzzy sets behave like analog surfaces or neural EEG waves gave me the idea for the biomimicry aspects. — An Adaptive Controller Using Fuzzy Memory
Problem
For code example I used lottery number play generator. Powerball is a U.S. multi-state lottery game. A Powerball play is composed of 5 white balls in range 1 – 55 and one red ball in the range of 1 – 42. As you can imagine the odds are extremely high that you could guess the Jackpot set (1 in 195,249,054). The jackpot gets very large and that is a great enticement to forgo any sanity and “you can’t win if you don’t play” you know.
If you enjoy the frills, ambiance, and clientele, gambling is a great way to throw away money. For more on Lottery see: Lottery Links back to top
Source Code
To simulate real random number generation, the program will continuously generate plays, but only produce the requested numbers of generated sets when the user clicks the enter key. I’m assuming a user waiting a certain amount of time adds a random element to the PRNG being used. Does it? I don’t think so, but that’s another topic.
A few points:
The problem could have been solved without using concurrent approach.
Example code use at your own risk. No correctness or quality guaranteed.
A Thread monitor view using VisualVM doesn’t show much, just six threads with four daemon ones.
We use the following batch file, which simply invokes the Main.groovy script.
@rem File: run.cmd
@rem Created 28 Dec 2008 J. Betancourt
call setEnv.cmd
groovy ..\src\main\groovy\net\coxmembers\jbetancourt1\Main.groovy %*
The main script driver is shown in listing 1. Two arguments are specified, how many dollars to lose and what type of concurrency approach to use. Executing it with no arguments would give:
c:Users\jbetancourt\Documents\projects\dev\ConcurrentGroovy-1bin>run
usage: usage -t type -d dollars [-h]
-d,--dollars How many dollars to lose
-h,--help Usage information
-t,--type Type of threading
Options for '-t' are:
"simple" ----> "Using simple Threads"
"lock" ----> "Using lock"
"executor" ----> "Using Futures"
"csp" ----> "Using JCSP"
"actor" ----> "Using Actors"
Example: run -d 5 -t simple
And, here is a sample run:
c:Users\jbetancourt\Documents\projects\dev\ConcurrentGroovy-1bin>run -d 5 -t simple
Setting up games 5 using "simple"
The odds of winning jackpot are: 1 in 146,107,962.00
Enter return key
Number of games generated are: 5
1: 7 11 14 23 34 PB: 9
2: 21 22 27 28 55 PB: 21
3: 2 9 30 37 46 PB: 30
4: 7 49 50 51 53 PB: 30
5: 9 18 24 40 42 PB: 32
Good luck!
Success?
Not really. A funny thing about this code, if you invoke it and quickly hit the return key, it doesn’t work correctly. Concurrent code can be hard to craft.
Example:
>run -d 5 -t executor
Setting up games 5 using "executor"
Press the Enter key on keyboard
Number of games generated are: 1
1: 18 20 22 27 41 PB: 25
Good luck!
Listing 1 Main driver class:
package net.cox.members.jbetancourt1;
import org.apache.commons.collections.buffer.CircularFifoBuffer
import org.apache.commons.collections.Buffer
import java.security.SecureRandom
import org.apache.commons.collections.BufferUtils
/**
* Generate Powerball plays.
* Required libraries:
* - Commons CLI
* - Commons collection
*
* A Powerball play is composed of 5 white balls in range 1 - 55
* and one red ball in the range of 1 - 42
* This script gets how many games to play, then
* continually generates this number of games
* and adds them into a circular buffer. This is
* repeated until the user hits a key, which
* interrupts the generation thread and dumps the results.
*
* A circular buffer is used since the results are
* only needed when the user hits a key,
* thereby making it "random". In actuality,
* since we are using a psuedo-random
* generator, this is still not true random.
* There is no way to improve the odds of
* picking the correct numbers for a lottery game.
*/
class Main{
def apps = [
"simple": [new GuessSimple(),"Using simple Threads"],
"lock": [new GuessWithLock(),"Using lock"],
"executor": [new GuessWithExecutor(),"Using Futures"],
"csp": [new GuessCSP(),"Using JCSP"],
"actor" : [new GuessActor(),"Using Actors"]
]
/** Main entry point */
static main(args){
def main = new Main()
def options = main.parseArguments(args)
if((options != null ) && options.d && options.t){
println "Setting up game" + (options.d>0? "s": "") +
" ${options.d} using "${options.t}""
def dollars = Integer.parseInt(options.d)
def games = BufferUtils.synchronizedBuffer(
new CircularFifoBuffer(dollars))
def threadType = ""
if(options.t){
threadType = options.t
}
def guess = main.apps[threadType][0]
guess.generate(dollars,games)
}
System.exit(0)
} // end main
/****************************************************************************
* Using CLI builder, parse the command line.
*/
def OptionAccessor parseArguments(String[] args){
def cli = new CliBuilder(usage: 'usage -t type -d dollars [-h]')
cli.h(longOpt: 'help', 'Usage information')
cli.t(longOpt: 'type', 'Type of threading',args:1)
cli.d(longOpt: 'dollars','How many dollars to lose',args:1,required:false)
def options = cli.parse(args)
if(options == null || options.getOptions().size() == 0 || options.h){
cli.usage()
println "nOptions for '-t' are: "
apps.each{
println("t"${it.key}"".
padRight(12) + " ----> "${it.value[1]}"")
}
println "nExample: run -d 5 -t simple"
return options
}
return options
}
} // end class Main
Listing 2 Base class:
<pre>package net.cox.members.jbetancourt1;
import org.apache.commons.collections.buffer.CircularFifoBuffer
import org.apache.commons.collections.Buffer
import java.security.SecureRandom
import org.apache.commons.collections.BufferUtils
/**
*
*/
class GuessBase{
def whiteBalls = [0]
def redBalls = [0]
def random = new SecureRandom()
//each element is an array of 0-4 white ball values and 5 is the red ball value.
def games //= BufferUtils.synchronizedBuffer(new CircularFifoBuffer(dollars))
def createBallSets(){
def i = 0
for(j in 1..55){
whiteBalls[i++] = j
}
i = 0
for(j in 1..42){
redBalls[i++] = j
}
}
/** Pick the set of white balls */
def pickWhite(){
def array = []
array.addAll(whiteBalls)
Collections.shuffle(array,random)
return array[0..4]
}
/** swap two elements in a list */
def swapElements(list,i,j){
def temp = list[i]
list[i] = list[j]
list[j] = temp
}
/** Pick the single red 'powerball'. */
def pickRed(){
int offset = random.nextInt(redBalls.size())
return redBalls[offset]
}
def showGames(){
def gamesSorted = games.sort(){a,b -> a[5] <=> b[5]}
def rownum = 1
println "Number of games generated are: ${games.size()}"
for (loop in gamesSorted){
print ((rownum++ + ": ").padLeft(8))
for(x in ( loop[0..4]).sort() ){
print ((x.toString()).padLeft(3))
}
println " PB:" + ((loop[5]).toString()).padLeft(3)
}
println "Good luck!"
}
} // end class GuessBase
Listing 3 Using simple Java Thread class:
package net.cox.members.jbetancourt1;
/**
* Example that uses a simple 'inline' thread.
*/
class GuessSimple extends GuessBase{
/** create d guesses and store into games array */
def synchronized generate(d,games){
this.games = games
createBallSets()
def worker = new Thread();
worker.setDaemon(true);
worker.start{ // Groovy allows closure here.
while(!Thread.currentThread().isInterrupted()){
def w = pickWhite()
def r = pickRed()
games.add((Object)(w+r))
random.setSeed(System.currentTimeMillis());
}
}
println "The odds of winning jackpot are: 1 in 146,107,962.00nEnter return key"
new InputStreamReader(System.in).readLine()
worker.interrupt()
Thread.sleep(1000) // required!
showGames()
}
} // End of GuessSimple.groovy
Listing 4 Using Object lock:
package net.cox.members.jbetancourt1;
/**
*
*/
class GuessWithLock extends GuessBase{
def lock = new Object() // for thread synchronization
/** create d guesses and store into games array */
def generate(d,games){
this.games = games
createBallSets()
def worker = new Thread(){
void run(){
while(!Thread.currentThread().isInterrupted()){
def w = pickWhite()
def r = pickRed()
games.add((Object)(w+r))
random.setSeed(System.currentTimeMillis());
}
synchronized(lock){
lock.notify()
}
}
}
worker.start()
println "The odds of winning jackpot are: 1 in 146,107,962.00"
println "Enter any key<cr>"
new InputStreamReader(System.in).readLine()
worker.interrupt()
showGames()
}
}
Listing 5 Using Executor:
/**
* File: GuessWithExecutor.groovy
* Author: Josef Betancourt
* Date: 9/1/2007
*
*/
package net.cox.members.jbetancourt1;
import org.apache.commons.collections.*
import java.util.concurrent.*
/**
* Generate powerball plays using Executor.
*/
class GuessWithExecutor extends GuessBase {
/** create d guesses and store into games array */
def generate(d,games){
def dollars = d
try{
def exec = Executors.newSingleThreadExecutor()
def gen = new GameGenerator<Buffer>(dollars,games)
gen.createBallSets()
def future = exec.submit(gen)
println "Press the Enter key on keyboard"
new InputStreamReader(System.in).readLine()
exec.shutdownNow()
waitForTermination(exec,100,10) // 100ms, max of 10 tries
try{
def buffer = future.get()
def exec2 = Executors.newSingleThreadExecutor()
exec2.execute(new GenerateReport(buffer))
exec2.shutdown()
}catch(ExecutionException ignore){
ignore.printStackTrace()
}
}catch(Exception ex){
ex.printStackTrace()
}
}
/**
*
* @param exec the Executor
* @param time how long to wait
* @param unit unit of time
* @param tries how many times to wait
*/
def waitForTermination(exec,time,tries) throws Exception {
def count = 0
while(!exec.awaitTermination(time, TimeUnit.MILLISECONDS)){
if(count++ >= tries){
break
}
}
}
} // end class GuessWithExecutor
/**
* A Callable that generates games until interrupted.
*/
def class GameGenerator extends GuessBase implements Callable {
def GameGenerator(dollars,games){
this.games = games
}
/**
* Create d guesses and store into games array.
* @see java.util.concurrent.Callable#call()
*/
public Object call() throws Exception {
while(!Thread.currentThread().isInterrupted()){
def w = pickWhite()
def r = pickRed()
games.add((Object)(w+r))
random.setSeed(System.currentTimeMillis());
}
return games
}
} // end class GameGenerator
/**
* Class to render the results
*/
def class GenerateReport extends GuessBase implements Runnable {
def GenerateReport(Buffer games){
this.games = games
}
public void run(){
showGames()
}
} // end class GenerateReport
Listing 6 Using Fork/Join:
// to do: approach will be to fork each required game set into its own thread, then join them to get the final result. Crazy, but a way to learn about Fork/Join and have sample code.
CSP
Some experts are touting the benefits of CSP. Below I use the jCSP library to create a network of processes that collaborate to solve the same problem. Note that some of the people involved in jCSP are adding some CSP support to GPar.
Listing 7 Using jCSP:
/**
* File: CspGuess.groovy
* Author: Josef Betancourt
* Date: 9/1/2007
*
*
*/
package net.cox.members.jbetancourt1;
import jcsp.lang.*;
/**
* Generate powerball plays using CSP.
*
* This version of the example uses the JCSP library. Note
* that unlike the other examples, there is an explicit
* declaration of a 'network'.
*
* Required libraries:
* - JCSP ver. 1.0-rc8 CSP library in Java:
* <a href="http://www.cs.ukc.ac.uk/projects/ofa/jcsp"></a><br/>
*
* Uses PAR and ALT methods presented in:
* <a href="http://www.soc.napier.ac.uk/publication/op/getpublication/publicationid/9097759">
* "Groovy Parallel! A Return to the Spirit of occam?"</a>
* by Jon KERRIDGE, Ken BARCLAY, and John SAVAGE
* The School of Computing, Napier University, Edinburgh EH10 5DT in
* Communicating Process Architectures 2005 13
* Jan Broenink, Herman Roebbers, Johan Sunter, Peter Welch, and David Wood (Eds.)
* IOS Press, 2005
*
* Perhaps, a better approach is possible using actors? See for example,
* <a href="http://gpars.codehaus.org/">Groovy Parallel Systems</a>
*
*/
class GuessCSP extends GuessBase {
def generate(dollars,games){
def suspendChannel = new One2OneChannel()
def resultChannel = new One2OneChannel()
// Create the CSP network and run it.
new PAR( [
new GameProcess(suspendChannel,resultChannel,games,dollars),
new UserInputProcess(suspendChannel),
new ReportProcess(resultChannel)]
).run()
}
}
/** process that generates games */
def class GameProcess extends GuessBase implements CSProcess{
def out
def suspend
def dollars
/** Create the process instance */
def GameProcess(done,out,games,dollars){
this.dollars = dollars
this.suspend = done
this.out = out
this.games = games
createBallSets()
}
/** run the process network */
public void run(){
def alternative = new ALT([suspend, new Skipper()])
def STOPPING=0, RUNNING=1
println "Creating ${dollars} game${(dollars>0? "s": "")} "
def suspended = false
while (!suspended){
switch (alternative.priSelect ())
{
case STOPPING:
suspend.read();
if(games.size()>0){
suspended = true;
out.write(games)
}
break;
case RUNNING:
def w = pickWhite()
def r = pickRed()
games.add((Object)(w+r))
random.setSeed(System.currentTimeMillis());
break
}
}
} // end run()
} // end GameProcess process
/** Process that gets the user input */
def class UserInputProcess implements CSProcess{
def suspend
def UserInputProcess(done){
this.suspend = done
}
public void run(){
println "Enter any key<cr>"
new InputStreamReader(System.in).readLine()
suspend.write(new Object())
}
} // end UserInputProcess process
/**
* Process to render the results to console
*/
def class ReportProcess extends GuessBase implements CSProcess{
def input
def ReportProcess(input){
this.input = input
}
public void run(){
games = input.read() // get result via channel only
showGames()
}
} // end of ReportProcess process
/**
* PAR A Groovyish Parallel.
*/
def private class PAR extends Parallel {
PAR(processList){
super( processList.toArray(new CSProcess[0]))
}
}
/**
* ALT A Groovyish Alternative.
*/
def private class ALT extends Alternative {
ALT (guardList) {
super( guardList.toArray(new Guard[0]) )
}
}
// end of CspGuess.groovy
Actor
I started to look into the Actor approach. Below is the start of code to solve the same problem. Even with the few lines of code, attempts to run it give an an exception, and classpath and other easy fixes do not solve it. On my todo list.
Setting up games 5 using "actor"
start
Caught: groovy.lang.MissingMethodException: No signature of method: static groovyx.gpars.actor.Actor.actor() is applicable for argument types: (net.cox.members.jbetancourt1.GuessActor$_generate_closure1) values: [net.cox.members.jbetancourt1.GuessActor$_generate_closure1@1fe571f]
Possible solutions: stop(), wait(), start(), any(), call(java.lang.Object), wait(long)
at net.cox.members.jbetancourt1.GuessActor.generate(GuessActor.groovy:55)
at net.cox.members.jbetancourt1.Main.main(Main.groovy:63)
Listing 8 Using Actors:
package net.cox.members.jbetancourt1
//import static groovyx.gpars.actor.Actors.actor
import groovyx.gpars.actor.*
import groovyx.gpars.actor.Actor.*
// @Grab(group='org.codehaus.gpars', module='gpars', version='0.9')
// @GrabResolver(name='jboss', root='http://repository.jboss.org/maven2/')
/**
*/
class GuessActor extends GuessBase {
def main = new GuessActor()
def dollars
def games
main.generate(dollars, games)
}
def generate(dollars,games){
println "start"
def gen = Actor.actor { index ->
loop {
react {message ->
if (message instanceof String) reply "got it"
else stop()
}
}
}
} // end of class GuessActor
Declarative CSP Using Spring
While experimenting I had an idea that ordinary Java beans could be used in a CSP network with suitable object wrappers or weaving. That is, an ordinary POJO could be wrapped to appear as a Process, just hook up the bean’s entry point for service into the CSP channel end points. This would be similar to the way that ordinary beans can be annotated to behave as Active Objects in the GPars Groovy library.
Below is the Spring Framework configuration file that does this for this software example. The code for the wrapper objects are not included here, too kludgey.
Off topic: The XML language config of Spring is looking long in tooth. Good thing Spring now supports annotations and Java config. Time for a Groovy builder approach too?.
<?xml version="1.0" encoding="UTF-8"?>
<!--
File: applicationContext.xml
Spring Framework based bean definitions for JCSP example program.
Author: Josef Betancourt
Date: 9/19/2007
-->
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:lang="http://www.springframework.org/schema/lang" xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.springframework.org/schema/lang
http://www.springframework.org/schema/lang/spring-lang-2.0.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-2.0.xsd">
<!-- ========================================================= -->
<!-- CSP NETWORK -->
<!-- ========================================================= -->
<bean name="network" dependency-check="objects" class="jcsp.lang.Parallel">
<constructor-arg>
<list>
<ref bean="gameProcess" />
<ref bean="reportProcess" />
<ref bean="userInputProcess" />
<ref bean="progressProcess"/>
</list>
</constructor-arg>
</bean>
<!-- ========================================================= -->
<!-- CHANNELS -->
<!-- ========================================================= -->
<bean name="suspendChannel" class="jcsp.lang.One2OneChannel"/>
<bean name="outputChannel" class="jcsp.lang.One2OneChannel"/>
<bean name="progressChannel" class="jcsp.lang.One2OneChannel"/>
<bean name="skipperChannel" class="jcsp.lang.Skip"/>
<!-- ========================================================= -->
<!-- GUARDS -->
<!-- ========================================================= -->
<bean name="alternative" class="jcsp.lang.Alternative" dependency-check="objects">
<constructor-arg>
<list>
<ref bean="suspendChannel" />
<ref bean="skipperChannel" />
</list>
</constructor-arg>
</bean>
<!-- ========================================================= -->
<!-- POJOs -->
<!-- ========================================================= -->
<!-- Creates the game generator, but it is setup programmatically
since it needs the user supplied number of games to play, see GameGenerator.setup(int).
-->
<bean name="gameGenerator" class="net.cox.members.jbetancourt.pb.game.GameGenerator">
<property name="random">
<bean class="java.security.SecureRandom"/></property>
<property name="maxRed" value="42" />
<property name="maxWhite" value="55" />
</bean>
<!-- ========================================================= -->
<!-- PROCESSES -->
<!-- ========================================================= -->
<!-- Receives signal from user to accept current generated game sets.
-->
<bean name="userInputProcess" init-method="init" class="net.cox.members.jbetancourt.pb.process.JCspSingleShotProxy">
<property name="outputChannel" ref="suspendChannel" />
<property name="methodName" value="run"/>
<property name="target">
<bean class="net.cox.members.jbetancourt.pb.game.KeyListener" scope="prototype"/>
</property>
</bean>
<!-- Game generator. Communicates with input and report processes.
-->
<bean name="gameProcess" dependency-check="objects"
class="net.cox.members.jbetancourt1.pb.process.GameProcess">
<property name="alternative" ref="alternative" />
<property name="suspendChannelIn" ref="suspendChannel" />
<property name="reportChannelOut" ref="outputChannel" />
<property name="progressChannelOut" ref="progressChannel"/>
<property name="gameGenerator" ref="gameGenerator"/>
<property name="showInterim" value="false"></property>
</bean>
<!-- Output results to console.
-->
<bean name="reportProcess" dependency-check="objects"
class="net.cox.members.jbetancourt1.pb.process.ReportProcess" scope="prototype">
<property name="input" ref="outputChannel" />
</bean>
<!-- Output interim results to console
-->
<bean name="progressProcess" dependency-check="objects"
class="net.cox.members.jbetancourt1.pb.process.ReportProcess" scope="prototype">
<property name="input" ref="progressChannel" />
</bean>
</beans>
Listing 10 Java main to load the network:
package net.cox.members.jbetancourt1.pb;
import jcsp.lang.Parallel;
import net.cox.members.jbetancourt1.pb.game.GameGenerator;
import org.apache.commons.cli.*;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
*
* Main program.
*
* @author JBetancourt
*
*/
public class Main {
private static final String TITLE =
"====== Lottery Generator using JCSP and Spring ============";
private static Parallel network;
private static ApplicationContext context;
/**
* Entry point for running the Guess application. Parses
* command line for number of games to play, loads
* wired application using Spring Framework
* based IoC, sets up game array, and then starts
* the network of processes.
* @param args
*/
public static void main(String[] args) {
System.out.println(TITLE);
int dollars = parseCommandLine(args);
if ( dollars == 0 ) {
return;
}
context = new ClassPathXmlApplicationContext(
new String[] { "applicationContext.xml"});
GameGenerator gen = (GameGenerator) context.
getBean("gameGenerator");
gen.setup(dollars);
network = (Parallel) context.getBean("network");
network.run();
}
/**
*/
private static int parseCommandLine(String[] args) {
... ellided ...
}
}
One possible conclusion is that there is no need for a ‘higher’ level concurrency support in Java or that this should evolve on its own before attempts at standardization are attempted.
Abstract
This blog post reiterates the existing critique of the concurrency support in the Java platform and language: that the low level thread and shared memory features should be improved upon with one or more high-level concurrency frameworks. Some of the competing frameworks are listed, and further references are given.
CR Categories: D1.3 [Concurrent Programming]; D.1.5 [Object-oriented Programming]; D.2.2 [Design Tools and Techniques]: Design Tools and Techniques — Software libraries;D3.2 [Language Classifications]: Concurrent, distributed, and parallel languages;D.3.3 [Language Constructs and Features]: Concurrent programming structures;
Keywords and Phrases: concurrency, Java, Actor Model, threads, Component based software engineering, Concurrent object-oriented programming, Distributed systems, Java CSP, CCS, multiprocessor
This is an excerpt from a document I wrote about three years ago. The doc was in a Java Specification Request (JSR) format and from time to time I updated the link section as I came upon interesting references. Maybe someone will find it interesting or it will spark a new idea for a project. In a later post I give some examples of concurrent code using the Groovy language.
Intro
The Java Platform has built-in support for concurrent programming. At the time of Java’s birth this was very big deal. But, is it now time to build upon this by the creation of a requirements document for high-level concurrency support in the Java language? Could this bring Java closer to a Concurrent Object-Oriented Language (COOL)?
Well, since I’m not a concurrency subject matter expert, just a working stiff developer, I will leave that to others, and just put my two cents here. What prompted me even looking into this subject was my experience on a few projects that required concurrency support.
What is needed is a survey or summary of what exactly are the issues, what is available, what their application areas are, and what are the development options. Some of initial tasks I see, in no particular order, are:
State the problem and application scope.
Provide use-cases.
Create project
wiki, forums, etc.
conference
Limit the scope of the effort.
Identify solution categories.
Identify solution selection criteria.
Testing and management requirements.
Measures, such as Performance and scalability.
Tools.
One possible conclusion is that there is no need for a ‘higher’ level concurrency support in Java or that this should evolve on its own before attempts at standardization are attempted.
Problem
One thing has been left out of the OO frenzy is the concept of a process. There are two manifestations of this: Methodology and Concurrency. In the methodological realm, ‘process’, which was part of data-flow analysis, is practically missing from modern analysis (as typified by OOP/UML derived methods). And even now, modular approaches, are still divorced from concurrency concerns. For example, OSGi fortunately brings back modularity to the Java environment. As stated by Kriens:
“Why is modular important? Well, if there is one consistent lesson in all our technology trends over the past 50 years then it is high cohesion and low coupling. Structured programming advocated this; OO forgot it for some time ….” — Peter Kriens, http://forum.springframework.org/showpost.php?p=138459&postcount=2
Yet, even in OSGi, a module is still at the mercy of unconstrained concurrency effects:
“OSGi … does not provide a thread of execution to each bundle. Events are delivered to bundles through certain interfaces, but no guarantees are made about which thread event delivery occurs on. Generally, event callbacks are required to finish quickly and should not make calls back into the OSGi framework to avoid possible deadlock. It is common, therefore, for bundles to start one or more threads in order to get work done.” — Oliver Goldman, “Multithreading, Java, & OSGi”
Welch calls this “unconstrained OO”:
“In unconstrained OO, threads and objects are orthogonal notions — sometimes dangerously competitive. Threads have no internal structure and can cross object boundaries in spaghetti-like trails that relate objects together in a way that has little correspondence to the original OO design. ” – Peter H. Welch in Javadocs for jcsp.lang.CSProcess of the JCSP library
And, others give similar critiques:
All object methods have to be invoked directly (or indirectly) by an external thread of control – they have to be caller-oriented (a somewhat curious property of so-called object oriented systems). — P.H Welsh (“Process Oriented Design for Java: Concurrency for All”)
“In standard OO, if you hold a reference to friend, and you wish to invoke the borrowMoney() method, then the call friend.borrowMoney() is executed in your own thread of control, not in a separate thread (or in friend’s thread), thus breaking all similarity to the way the real world works.” — (Oprean and Pederson, 2008).
“Although the development of parallel languages began around 1972, it did not stop here. Today we have three major communication paradigms: monitors, remote procedures, and message passing. Any one of them would have been a vast improvement over Java’s insecure variant of shared classes. As it is, Java ignores the last twenty-five years of research in parallel languages.”
— JAVA’S INSECURE PARALLELISM by Per Brinch Hansen (1999)
“However, I think Java needs to change. When it was first released, its number one competitor was C++, and Java walked all over it in terms of support for concurrency. Threads in the standard library! Syntactic support for critical blocks! Implicit locks in every object! Now, Java’s number one competitor is C#, which is increasingly starting to look like a functional language. Also languages like Erlang and Haskell have concurrency primitives that walk all over Java’s (I particularly love Haskell’s composable memory transactions). I hope this is something that is addressed in Java 7, but I fear it won’t be…” — Neil Bartlett
A higher level concurrency support in Java will allow a more approachable use of concurrent development. Though the concurrency implementation in Java is a vast improvment over what was available in a popular language at the time, we are now seeing more interests in other approaches as shown in Scala, Erlang, Kilim, Akka, GPars, Haskell, and others.
A standard would then allow, within reason, the sharing of development expertise and common patterns and idioms among the different languages and frameworks. For example, could an Actor in Scala be behaviorially the same as an Actor in plain old Java?
We are also seeing multiple processor cores being used to continue the improvment in thruput since thermal and process limititions have reduced growth. These will also allow true parallism. A high-end actual product, Sparc T3, the Sun Microelectronics version of the chip multi-threaded (CMT) processor, will be capable of forming a quad core SOC (System on Chip) that offers 512 hardware threads. Even on personal desktop systems, we already have quad cores and soon hexacores will be commonplace in the high end, such as the Intel Nehalem-EX. (Note that concurrency and parallelism are not the same thing).
Update: This was written a while back. In the interim, Oracle bought Sun and in the process canceled the Rock high-end CMT based processor design. Yet, Oracle said they will continue to support the upcoming SPARC CMT versions. Interesting discussion is found here. Intel and AMD have not stood still. AMD, for example, is promising a 16 core “Interlagos” chip soon. On the mobile side, ARM and others are also producing multicore systems.
Plain Old Concurrent Component
Instead of just reusing existing terms, I can call this new concurrency support a Concurrent Component. A component is more similar to an OS process, whereas internally it may incorporate light weight threads.
A possible conceptual view of a Component or Plain Old Concurrent Object (POCO) is shown in figure 1 below . Note that this is intentionally reminiscent of a Programmable Logic Device (PLD) block diagram. Each block is an optional ‘concern’ that crosscuts each object that is part of the component. The nested objects can share state and optionally will execute within a fine-grain concurrency kernel. The external control communication is via interfaces that expose the Component API, whereas the actual messaging embeds the application API that the component instantiation provides. This is somewhat of the flavor of Cox’s Software IC concept (ref?). This is also similar to various old Microsoft COM models. In future, it’s even conceivable that each POCO could have its own core assigned in an dynamically created application specific multicore processor implemented in nanoprocessor FPGA.
And, yes the diagram is just my brain’s core dump on the subject. Would require more work to explore this further, and determine what would really make sense.
Concurrent Component
Why isn’t this need met by existing specifications?
Currently, developers can use the concurrency control constructs that are provided in the Java language itself. Many experts consider these too low level for some applications and for use by the average developer (in terms of development effort, correctness, and failure potentials):
Use Erlang-Style Concurrency. The Java concurrency primitives like locks and synchronized have been proven to be too low level and often to hard to use. There are better ways to write concurrent code. Erlang Style concurrency is one of them – in Java there are many ways to achive this in Java – I’ve written about them here. Newer ones are Akka and Actorom. You can also use Join/Fork or the myriad of data structures in java.util.concurrent. —- Stephan Schmidt in Go Ahead: Next Generation Java Programming Style
In Java 5 and above one uses the new java.util.concurrent utilities. These offer a powerful and relatively accessible API such as the Executor and the Fork/Join frameworks. However, though the latter were a significant improvement, their use still require advanced skills and really only offer the means to create application specific or JVM language based concurrency frameworks.
Even with concurrency updates in Java 6 and Java 7, the Java language doesn’t make parallel programming particularly easy. Java threads, synchronized blocks, wait/ notify, and the java.util.concurrent package all have their place, but Java developers pressed to meet the capacity of multi-core systems are turning to techniques pioneered in other languages.
Some languages or libraries, such as Scala (?) and JCSP even use the underlying concurrency support in Java to create the respective ‘active’ object concurrency extensions. See for instance this presentation by Doug Lea, “Engineering Fine-Grained Parallelism Support for Java 7”.
Note that introducing a higher level of abstraction will not necessarily reduce the need for skilled engineering. In fact, it may increase that need until the patterns, methodologies, measurement, and tool sets catch up and support the new framework. Of course, many other concerns are important, such as performance.
Underlying technology or technologies:
There is a rich history in both industrial and academic research into the theory and practices of concurrency. In the fifty plus years of this, surely there must be more that can be used in Java then Monitors, semaphores and other low level tools.
Some examples are Communicating Sequential Processes (CSP), Calculus of communicating systems (CCS), and all the ‘proven’ concurrent support already found in other languages, among them, Occam, Go, Erlang, Haskell, Esterel, Scala, and many others in the academic research community. There is even one observation that the current Monitor implementation in Java itself could more fully embrace the original theoretical Monitor concept.
Of course, as in anything else, there is hype and fashion in software too. Thus, some approaches are touted as fixing deadlocks and other problems, when they don’t.
· Michele Simionato, “Threads, processes and concurrency in Python: some thoughts”, [weblog entry.] The Explorer. 26 Jul 2010. (http://www.artima.com/weblogs/viewpost.jsp?thread=299551). 26 Jul 2010.
· Orlic, Bojan; “SystemCSP : a graphical language for designing concurrent component-based embedded control systems”. (2007) thesis. http://doc.utwente.nl/58009/
· “Gartner Says as the Number of Processors Swells Inside Servers, Organizations May Not Be Able to Use All Processors Thrust on Them”, http://www.gartner.com/it/page.jsp?id=867112
· “Tilera vs. Godzilla”, Louis Savain, http://rebelscience.blogspot.com/2007/08/tilera-vs-godzilla.html
· Irfan Pyarali, Tim Harrison, and Douglas C. Schmidt Thomas D. Jordan; “Proactor: “An Object Behavioral Pattern for Demultiplexing and Dispatching Handlers for Asynchronous Events”, http://www.cs.wustl.edu/~schmidt/PDF/proactor.pdf
· “A Component-Oriented Model for the Design of Safe Multi-threaded Applications”, Reimer Behrends, R. E. K. Stirewalt and L. K. Dillon Dept. of Computer Science and Engineering Michigan State University, accessed on 3/10/2009 at http://www.cse.msu.edu/~stire/Papers/cbse05.pdf
· “Integrating and Extending JCSP”, Peter WELCHa, Neil BROWNa, James MOORES b, Kevin CHALMERS c and Bernhard SPUTH. Communicating Process Architectures 2007. Accessed 2007 at xxxx.
· Yuri Gurevich, Wolfram Schulte, and Charles Wallace, “Investigating Java Concurrency using Abstract State Machines”. In Y. Gurevich, P. Kutter, M. Odersky, and L. Thiele, eds., Abstract State Machines: Theory and Applications, Springer LNCS 1912, 2000, 151-176.
· P. Brinch Hansen, A keynote address on concurrent programming. Keynote address for the IEEE Computer Software & Applications Conference, Chicago, IL, November 1978. Computer 12, 5 (May 1979), 50{56. Copyright °c 1979, Institute of Electrical and Electronics Engineers, Inc., accessed on 3/3/09 at http://brinch-hansen.net/papers/1978c.pdf
· “Concurrency: The Next Generation”, Damian Dimmich, Christian Jacobsen and Matthew Jadud, Computing Laboratory University of Kent Canterbury, CT2 7NZ, accessed on 3/3/09 at http://www.cs.kent.ac.uk/pubs/2006/2522/content.pdf
· “A type safe state abstraction for coordination in Java-like languages”,Ferruccio Damiani, Elena Giachino, Paola Giannini, and Sophia Drossopoulou,Acta Informatica, Volume 45, Numbers 7-8 / December, 2008, http://www.springerlink.com/content/2168q630k2355563/
Books
· The Art of Concurrency: A thread Monkey’s Guide to Writing Parallel Applications, Clay Breshears, O’Reilly Media, May 2009. 978-0-596-52153-0. http://oreilly.com/catalog/9780596521547
· Concurrency: State Models & Java Programs, Jeff Magee and Jeff Kramer
· Java Concurrency in Practice, Brian Goetz
· Concurrent Programming in Java, Doug Lea,
· Java Thread Programming, Paul Hyde,
· Java Threads, Oaks & Wong,
· Multithreaded Programming with Windows NT, Thuan Q. Pham and Pankaj K. Garg