// Purpose. Piped streams [source: Horstmann, vol2, pp114-115] // // Discussion. Whenever you have a "producer-consumer" pair, the possibility // of "impedance mismatch" exists as each performs its duties at potentially // different speeds. If the consumer is working faster than the producer, // you would like the consumer to "block" until input is available. If the // producer is working faster than the consumer, you would like the producer // to "block" when the queueing mechanism is full. "Java has a convenient // set of classes (PipedOutputStream and PipedInputStream) to implement this // communication pattern. The principal reason to use pipes is to keep each // thread simple. The producer thread simply writes its output to a stream // and forgets about it. The consumer thread simply reads input from a stream // without having to care where it comes from. By using pipes, multiple // threads can be connected with each other." [Horstmann, vol2, p113] import java.io.*; class Producer extends Thread { private DataOutputStream out; public Producer( OutputStream os ) { out = new DataOutputStream( os ); } public void run() { while (true) try { int val = (int) (Math.random() * 100); System.out.print( ((val < 10) ? " " : "") + val ); System.out.flush(); out.writeInt( val ); out.flush(); sleep( 1000 ); } catch( Exception e ) { e.printStackTrace(); } } } class Filter extends Thread { private DataInputStream inn; private DataOutputStream out; private int total = 0; private int count = 0; public Filter( InputStream is, OutputStream os ) { inn = new DataInputStream( is ); out = new DataOutputStream( os ); } public void run() { while (true) try { int val = inn.readInt(); total += val; count++; double avg = (double) total / count; String just = (total < 100) ? " " : ""; System.out.print( " => " + just + total + " / " + count ); System.out.flush(); out.writeDouble( avg ); } catch( Exception e ) { e.printStackTrace(); } } } class Consumer extends Thread { private DataInputStream in; public Consumer( InputStream is ) { in = new DataInputStream( is ); } public void run() { while (true) try { double avg = in.readDouble(); System.out.println( " => " + avg ); System.out.flush(); } catch( Exception e ) { e.printStackTrace(); } } } public class ThreadPipedStreams { public static void main( String[] args ) { try { PipedOutputStream pout1 = new PipedOutputStream(); PipedInputStream pinn1 = new PipedInputStream( pout1 ); PipedOutputStream pout2 = new PipedOutputStream(); PipedInputStream pinn2 = new PipedInputStream( pout2 ); Producer prod = new Producer( pout1 ); Filter filt = new Filter( pinn1, pout2 ); Consumer cons = new Consumer( pinn2 ); prod.start(); filt.start(); cons.start(); } catch( IOException e ) { e.printStackTrace(); } } } // 75 => 75 / 1 => 75.0 // 51 => 126 / 2 => 63.0 // 64 => 190 / 3 => 63.333333333333336 // 88 => 278 / 4 => 69.5 // 21 => 299 / 5 => 59.8 // 9 => 308 / 6 => 51.333333333333336 // 82 => 390 / 7 => 55.714285714285715 // 69 => 459 / 8 => 57.375 // 61 => 520 / 9 => 57.77777777777778