Flow Streams

RxJS is overkill for most apps. Get reactive programming that doesn't require a PhD to understand.

The Reactive Programming Dilemma

You need reactive streams, but RxJS feels like bringing a spaceship to drive to work.

The RxJS Learning Cliff

// "Simple" WebSocket handling with RxJS
import { webSocket } from 'rxjs/webSocket';
import {
  retry, retryWhen, delay, scan, catchError,
  switchMap, exhaustMap, mergeMap, concatMap,
  debounceTime, distinctUntilChanged, takeUntil,
  shareReplay, startWith, tap, filter
} from 'rxjs/operators';

const socket$ = webSocket('ws://localhost:8080').pipe(
  retryWhen(errors$ =>
    errors$.pipe(
      scan((acc, error) => ({ ...acc, count: acc.count + 1 }), { count: 0 }),
      tap(({ count }) => console.log(`Retry #${count}`)),
      delay(1000),
      filter(({ count }) => count < 5)
    )
  ),
  shareReplay({ bufferSize: 1, refCount: true })
);

// Your team: "What does switchMap do again?"
// You: "It's like flatMap but cancels previous... wait no that's exhaustMap"
// New dev: "I'll just use promises"

// 200+ operators. Marble diagrams. Hot vs Cold confusion.
// Subscription management. Memory leak paranoia.
// Is this really better than callbacks?

Flow: Reactive Programming for Humans

import { callbackFlow } from 'kotlinify-ts/flow';
import { delay } from 'kotlinify-ts/coroutines';

// WebSocket with auto-reconnect - actually readable
const socketFlow = callbackFlow<Message>(scope => {
  const ws = new WebSocket('ws://localhost:8080');

  ws.onmessage = (e) => scope.emit(JSON.parse(e.data));
  ws.onerror = () => ws.close();
  ws.onclose = () => scope.close();

  scope.onClose(() => ws.close());
});

// Auto-retry with backoff
const reliableSocket = socketFlow.retryWhen((error, attempt) => {
  if (attempt < 5) {
    console.log(`Retry #${attempt}: ${error.message}`);
    return delay(1000 * attempt).then(() => true);
  }
  return false;
});

// Use it - no PhD required
await reliableSocket
  .filter(msg => msg.type === 'update')
  .map(msg => msg.data)
  .collect(data => updateUI(data));

// That's it. No marble diagrams. No subscription management.
// Just data flowing through transformations.

RxJS Complexity

  • • 200+ operators to learn
  • • Subscription management
  • • Scheduler confusion
  • • Memory leak footguns

Manual Event Handling

  • • Callback hell returns
  • • No composition
  • • Manual cleanup everywhere
  • • State synchronization bugs

Flow Balance

  • • 30 intuitive operators
  • • Auto cleanup with scopes
  • • Built-in backpressure
  • • StateFlow for UI state

Real-World Flow Use Cases

UI State Management

StateFlow replaces Redux/MobX with 10× less boilerplate. One source of truth, automatic updates.

WebSocket Streams

Handle real-time data with automatic reconnection, buffering, and error recovery.

Event Processing

Debounce searches, throttle scrolls, combine user inputs - without the complexity.

Data Pipelines

Transform API responses through multiple stages with backpressure handling.

Creating Flows

Multiple ways to create flows from values, generators, arrays, or callbacks.

import { flow, flowOf, asFlow, callbackFlow } from 'kotlinify-ts/flow';

// From values
await flowOf(1, 2, 3, 4, 5)
  .map(x => x * 2)
  .filter(x => x > 5)
  .collect(console.log); // 6, 8, 10

// From async generator
const timerFlow = flow(async function* () {
  for (let i = 0; i < 5; i++) {
    yield i;
    await new Promise(r => setTimeout(r, 1000));
  }
});

await timerFlow
  .map(x => `Tick ${x}`)
  .collect(console.log); // Tick 0, Tick 1, ...

// From arrays or iterables
await asFlow([1, 2, 3, 4, 5])
  .filter(x => x % 2 === 0)
  .collect(console.log); // 2, 4

// From callbacks (for event streams)
const clickFlow = callbackFlow<MouseEvent>(scope => {
  const handler = (e: MouseEvent) => scope.emit(e);
  document.addEventListener('click', handler);

  scope.onClose(() => {
    document.removeEventListener('click', handler);
  });
});

await clickFlow
  .map(e => ({ x: e.clientX, y: e.clientY }))
  .collect(pos => console.log(`Clicked at ${pos.x}, ${pos.y}`));

Transformation Operators

Rich set of operators for transforming, filtering, and controlling flow emissions.

import { flowOf } from 'kotlinify-ts/flow';

// Basic transformations
await flowOf(1, 2, 3, 4, 5)
  .map(x => x * x)                    // Square each value
  .filter(x => x > 10)                // Keep only > 10
  .take(2)                             // Take first 2
  .collect(console.log);               // 16, 25

// Distinct and debouncing
await userInputFlow()
  .debounce(300)                      // Wait 300ms of silence before emitting
  .distinctUntilChanged()              // Skip consecutive duplicates
  .retryWhen((error, attempt) => attempt < 3) // Resilient retries
  .map(query => searchAPI(query))
  .collect(results => displayResults(results));

// Throttling for rate limiting
await scrollEventFlow()
  .throttle(100)                      // Max 1 emission per 100ms
  .map(e => window.scrollY)
  .distinctUntilChanged()
  .collect(position => updateScrollIndicator(position));

// Sampling periodic snapshots
await sensorDataFlow()
  .sample(1000)                       // Emit latest value every second
  .map(reading => reading.temperature)
  .collect(temp => updateDisplay(temp));

// Buffering for batch processing
await dataStream()
  .buffer(100)                        // Process in batches of 100
  .map(batch => processBatch(batch))
  .collect(results => saveResults(results));

Try it yourself:

// Basic flow transformations
await flowOf(1, 2, 3, 4, 5, 6)
  .map(x => x * 2)
  .filter(x => x > 5)
  .take(3)
  .collect(x => console.log('Value:', x));

console.log('---');

// Create a simple flow
const numbers = flowOf(10, 20, 30, 40);
await numbers.collect(n => console.log('Number:', n));

Advanced Transformations

Powerful operators for complex flow transformations and flattening strategies.

import { flowOf, flow } from 'kotlinify-ts/flow';

// Custom transformation with multiple emissions
await flowOf(1, 2, 3)
  .transform(async (value, emit) => {
    await emit(value);
    await emit(value * 10);
    await emit(value * 100);
  })
  .collect(console.log); // 1, 10, 100, 2, 20, 200, 3, 30, 300

// Scan for running calculations
await flowOf(1, 2, 3, 4, 5)
  .scan(0, (sum, value) => sum + value)
  .collect(console.log); // 0, 1, 3, 6, 10, 15

// Add index to values
await flowOf('a', 'b', 'c')
  .withIndex()
  .collect(([index, value]) =>
    console.log(`${index}: ${value}`)
  ); // 0: a, 1: b, 2: c

// FlatMap strategies
const searchFlow = (query: string) => flow(async function* () {
  const results = await searchAPI(query);
  for (const result of results) yield result;
});

// flatMapConcat: Process sequentially
await flowOf('kotlin', 'typescript', 'rust')
  .flatMapConcat(query => searchFlow(query))
  .collect(console.log); // All kotlin results, then typescript, then rust

// flatMapMerge: Process in parallel (with concurrency limit)
await flowOf('kotlin', 'typescript', 'rust')
  .flatMapMerge(2, query => searchFlow(query))
  .collect(console.log); // Results arrive as they complete, max 2 concurrent

// flatMapLatest: Cancel previous on new emission
await userInputFlow()
  .debounce(300) // Wait for idle input before switching requests
  .flatMapLatest(query => searchFlow(query))
  .collect(displayResults); // Cancels previous search on new input

StateFlow - Reactive State Management

StateFlow holds a single value and emits updates to all collectors. Perfect for UI state management.

import { MutableStateFlow } from 'kotlinify-ts/flow';

// Create reactive state
const counter = new MutableStateFlow(0);
const user = new MutableStateFlow<User | null>(null);
const theme = new MutableStateFlow<'light' | 'dark'>('light');

// Read current value synchronously
console.log(counter.value); // 0

// Update state
counter.value = 5;
counter.update(current => current + 1); // Functional update
user.value = { id: 1, name: 'Alice' };

// Compare and set atomically
const wasUpdated = counter.compareAndSet(6, 10);
console.log(wasUpdated); // true if was 6, false otherwise

// Subscribe to changes (always emits current value first)
await counter.collect(count => {
  console.log(`Count: ${count}`);
}); // Immediately emits 10, then future updates

// Combine multiple states
const appState = new MutableStateFlow({
  user: null as User | null,
  theme: 'light' as Theme,
  notifications: [] as Notification[]
});

// React component example
function useFlowState<T>(flow: StateFlow<T>) {
  const [value, setValue] = useState(flow.value);

  useEffect(() => {
    const job = launch(() =>
      flow.collect(setValue)
    );
    return () => job.cancel();
  }, [flow]);

  return value;
}

// WebSocket connection state
const connectionState = new MutableStateFlow<
  'disconnected' | 'connecting' | 'connected' | 'error'
>('disconnected');

websocket.onopen = () => connectionState.value = 'connected';
websocket.onerror = () => connectionState.value = 'error';
websocket.onclose = () => connectionState.value = 'disconnected';

await connectionState
  .distinctUntilChanged()
  .collect(state => updateConnectionUI(state));

SharedFlow - Event Broadcasting

SharedFlow broadcasts values to multiple collectors without state. Configurable replay and buffering.

import { MutableSharedFlow, SharedFlow } from 'kotlinify-ts/flow';

// Basic event bus
const events = new MutableSharedFlow<AppEvent>();

// With replay cache (new subscribers get last N values)
const messages = new MutableSharedFlow<Message>({
  replay: 5  // New collectors get last 5 messages
});

// With buffer for handling slow collectors
const dataStream = new MutableSharedFlow<Data>({
  replay: 0,
  extraBufferCapacity: 64,
  onBufferOverflow: 'DROP_OLDEST' // or 'DROP_LATEST', 'SUSPEND'
});

// Emit events
await events.emit({ type: 'user-login', userId: 123 });
events.tryEmit({ type: 'page-view', path: '/home' }); // Non-suspending

// Multiple collectors
await Promise.all([
  events.collect(e => logEvent(e)),
  events.collect(e => sendAnalytics(e)),
  events.collect(e => updateUI(e))
]);

// Check active subscriptions
console.log(events.subscriptionCount); // 3

// Access replay cache
console.log(messages.replayCache); // Array of last N messages

// Clean up
events.resetReplayCache();
events.cancelAll(); // Cancel all collectors

// Real-world: WebSocket message distribution
const wsMessages = new MutableSharedFlow<WSMessage>({ replay: 1 });

websocket.onmessage = (e) => {
  const message = JSON.parse(e.data);
  wsMessages.tryEmit(message);
};

// Different components subscribe to different message types
await wsMessages
  .filter(msg => msg.type === 'chat')
  .collect(msg => displayChatMessage(msg));

await wsMessages
  .filter(msg => msg.type === 'notification')
  .collect(msg => showNotification(msg));

Combining Flows

Combine multiple flows to create complex reactive data pipelines.

import { combine, zip, merge, flowOf } from 'kotlinify-ts/flow';

// Combine: Emits when any flow emits (after all have emitted once)
const userFlow = new MutableStateFlow(currentUser);
const settingsFlow = new MutableStateFlow(currentSettings);
const themeFlow = new MutableStateFlow(currentTheme);

await combine(
  userFlow,
  settingsFlow,
  (user, settings) => ({ user, settings })
)
  .collect(state => renderApp(state));
// Emits whenever user OR settings change

// Zip: Pairs values from flows
const questionsFlow = flowOf('Name?', 'Age?', 'City?');
const answersFlow = flowOf('Alice', '25', 'NYC');

await zip(
  questionsFlow,
  answersFlow,
  (q, a) => `${q} ${a}`
)
  .collect(console.log);
// "Name? Alice", "Age? 25", "City? NYC"

// Merge: Combine emissions from multiple flows
const mouseFlow = callbackFlow(/* mouse events */);
const keyboardFlow = callbackFlow(/* keyboard events */);
const touchFlow = callbackFlow(/* touch events */);

await merge(mouseFlow, keyboardFlow, touchFlow)
  .throttle(50)
  .collect(event => handleUserInput(event));

// Real-world: Form validation
const emailFlow = new MutableStateFlow('');
const passwordFlow = new MutableStateFlow('');

await combine(
  emailFlow,
  passwordFlow,
  (email, password) => ({
    email,
    password,
    isValid: email.includes('@') && password.length >= 8
  })
)
  .distinctUntilChangedBy(state => state.isValid)
  .collect(state => {
    submitButton.disabled = !state.isValid;
  });

Terminal Operations

Collect flow values into different forms or compute aggregations.

import { flowOf } from 'kotlinify-ts/flow';

// Collect to collections
const array = await flowOf(1, 2, 3).toArray();        // [1, 2, 3]
const list = await flowOf(1, 2, 3).toList();         // [1, 2, 3]
const set = await flowOf(1, 1, 2, 3).toSet();        // Set{1, 2, 3}

// First and last
const first = await flowOf(1, 2, 3).first();         // 1
const last = await flowOf(1, 2, 3).last();           // 3
const firstOrNull = await flowOf<number>().firstOrNull(); // null

// Single value (throws if not exactly one)
const single = await flowOf(42).single();            // 42
const singleOrNull = await flowOf(1, 2).singleOrNull(); // null

// Aggregations
const sum = await flowOf(1, 2, 3, 4)
  .reduce((a, b) => a + b);                          // 10

const product = await flowOf(1, 2, 3, 4)
  .fold(1, (acc, value) => acc * value);             // 24

const count = await flowOf(1, 2, 3).count();         // 3

// Predicates
const hasEven = await flowOf(1, 3, 5, 6)
  .any(x => x % 2 === 0);                            // true

const allPositive = await flowOf(1, 2, 3)
  .all(x => x > 0);                                  // true

const noneNegative = await flowOf(1, 2, 3)
  .none(x => x < 0);                                 // true

// Real-world: API response validation
const isValid = await apiResponseFlow()
  .map(response => response.data)
  .all(data => data.status === 'success');

const errorCount = await logFlow()
  .filter(log => log.level === 'ERROR')
  .count();

Error Handling & Resilience

Handle errors gracefully with retry strategies and fallback mechanisms.

import { flow, flowOf } from 'kotlinify-ts/flow';

// Catch and handle errors
await flow(async function* () {
  yield 1;
  yield 2;
  throw new Error('Something went wrong');
  yield 3; // Never reached
})
  .catch(error => console.error('Caught:', error.message))
  .collect(console.log); // 1, 2, then error logged

// Retry with fixed attempts
await fetchDataFlow()
  .retry(3) // Retry up to 3 times on error
  .collect(data => processData(data));

// Retry with custom logic
await apiFlow()
  .retryWhen((error, attempt) => {
    if (error.code === 'RATE_LIMIT') {
      return attempt < 5; // Retry rate limits up to 5 times
    }
    return false; // Don't retry other errors
  })
  .collect(response => handleResponse(response));

// Default values for empty flows
await possiblyEmptyFlow()
  .defaultIfEmpty({ id: 0, name: 'Default' })
  .collect(console.log); // Emits default if flow is empty

// Handle empty flows
await searchFlow(query)
  .onEmpty(() => console.log('No results found'))
  .collect(result => displayResult(result));

// Lifecycle hooks for cleanup
await resourceFlow()
  .onStart(() => console.log('Starting flow'))
  .onEach(value => console.log('Processing:', value))
  .onCompletion(() => console.log('Flow completed'))
  .catch(error => console.error('Flow error:', error))
  .collect(value => processValue(value));

// Real-world: Resilient API polling
const pollAPI = () => flow(async function* () {
  while (true) {
    try {
      const data = await fetchAPI();
      yield data;
    } catch (error) {
      console.error('Poll failed:', error);
      yield null; // Emit null on error
    }
    await new Promise(r => setTimeout(r, 5000));
  }
});

await pollAPI()
  .filter(data => data !== null)
  .retryWhen((error, attempt) => {
    const delay = Math.min(1000 * Math.pow(2, attempt), 30000);
    return new Promise(resolve =>
      setTimeout(() => resolve(attempt < 10), delay)
    );
  })
  .collect(data => updateDashboard(data));

Real-World Examples

Practical patterns for common reactive programming scenarios.

import { MutableStateFlow, MutableSharedFlow, callbackFlow, flow, combine } from 'kotlinify-ts/flow';

// 1. Search with debounce and cancellation
const searchInput = new MutableStateFlow('');

searchInput
  .debounce(300) // Wait for typing to pause
  .filter(query => query.length > 2)
  .distinctUntilChanged()
  .flatMapLatest(query =>
    flow(async function* () {
      const results = await searchAPI(query);
      yield* results;
    })
  )
  .catch(error => {
    console.error('Search failed:', error);
    return [];
  })
  .collect(results => renderSearchResults(results));

// 2. WebSocket reconnection with exponential backoff
const createWebSocketFlow = () => callbackFlow<MessageEvent>(scope => {
  let ws: WebSocket;
  let reconnectDelay = 1000;

  const connect = () => {
    ws = new WebSocket('wss://api.example.com');

    ws.onmessage = (e) => scope.emit(e);
    ws.onopen = () => {
      console.log('Connected');
      reconnectDelay = 1000; // Reset delay
    };
    ws.onerror = (e) => console.error('WebSocket error:', e);
    ws.onclose = () => {
      if (scope.isActive) {
        setTimeout(connect, reconnectDelay);
        reconnectDelay = Math.min(reconnectDelay * 2, 30000);
      }
    };
  };

  connect();
  scope.onClose(() => ws?.close());
});

await createWebSocketFlow()
  .map(e => JSON.parse(e.data))
  .filter(msg => msg.type === 'data')
  .collect(data => processRealtimeData(data));

// 3. Infinite scroll with pagination
const scrollFlow = callbackFlow<number>(scope => {
  const handler = () => {
    const { scrollTop, scrollHeight, clientHeight } = document.documentElement;
    if (scrollTop + clientHeight >= scrollHeight - 100) {
      scope.emit(Math.floor(scrollTop / 1000)); // Page number
    }
  };

  window.addEventListener('scroll', handler);
  scope.onClose(() => window.removeEventListener('scroll', handler));
});

const currentPage = new MutableStateFlow(0);

await scrollFlow
  .distinctUntilChanged()
  .collect(page => {
    currentPage.value = page;
    loadMoreContent(page);
  });

// 4. Real-time collaborative editing
interface Edit { userId: string; changes: any; timestamp: number; }

const localEdits = new MutableSharedFlow<Edit>();
const remoteEdits = new MutableSharedFlow<Edit>();
const documentState = new MutableStateFlow(initialDocument);

await merge(localEdits, remoteEdits)
  .scan(initialDocument, (doc, edit) => applyEdit(doc, edit))
  .distinctUntilChanged()
  .collect(doc => {
    documentState.value = doc;
    renderDocument(doc);
  });

// 5. System monitoring dashboard
const cpuFlow = flow(async function* () {
  while (true) {
    yield await getCPUUsage();
    await new Promise(r => setTimeout(r, 1000));
  }
});

const memoryFlow = flow(async function* () {
  while (true) {
    yield await getMemoryUsage();
    await new Promise(r => setTimeout(r, 1000));
  }
});

await combine(
  cpuFlow.throttle(5000),
  memoryFlow.throttle(5000),
  (cpu, memory) => ({ cpu, memory, timestamp: Date.now() })
)
  .scan([], (history, stats) =>
    [...history.slice(-59), stats].slice(-60) // Keep last minute
  )
  .collect(history => renderChart(history));

Best Practices

Guidelines for effective Flow usage in production applications.

✓ Do

  • • Use StateFlow for UI state that needs current value access
  • • Use SharedFlow for events without state
  • • Apply backpressure strategies (buffer, conflate, sample) for high-frequency streams
  • • Clean up with onClose() in callbackFlow
  • • Use distinctUntilChanged() to avoid unnecessary updates
  • • Prefer cold flows for one-time operations

✗ Don't

  • • Don't forget to handle errors with catch()
  • • Don't create flows in tight loops
  • • Don't ignore backpressure in high-frequency streams
  • • Don't forget to cancel collectors when done
  • • Don't use SharedFlow when you need the current value

Next Steps