Flow Streams
RxJS is overkill for most apps. Get reactive programming that doesn't require a PhD to understand.
The Reactive Programming Dilemma
You need reactive streams, but RxJS feels like bringing a spaceship to drive to work.
The RxJS Learning Cliff
// "Simple" WebSocket handling with RxJS
import { webSocket } from 'rxjs/webSocket';
import {
retry, retryWhen, delay, scan, catchError,
switchMap, exhaustMap, mergeMap, concatMap,
debounceTime, distinctUntilChanged, takeUntil,
shareReplay, startWith, tap, filter
} from 'rxjs/operators';
const socket$ = webSocket('ws://localhost:8080').pipe(
retryWhen(errors$ =>
errors$.pipe(
scan((acc, error) => ({ ...acc, count: acc.count + 1 }), { count: 0 }),
tap(({ count }) => console.log(`Retry #${count}`)),
delay(1000),
filter(({ count }) => count < 5)
)
),
shareReplay({ bufferSize: 1, refCount: true })
);
// Your team: "What does switchMap do again?"
// You: "It's like flatMap but cancels previous... wait no that's exhaustMap"
// New dev: "I'll just use promises"
// 200+ operators. Marble diagrams. Hot vs Cold confusion.
// Subscription management. Memory leak paranoia.
// Is this really better than callbacks?Flow: Reactive Programming for Humans
import { callbackFlow } from 'kotlinify-ts/flow';
import { delay } from 'kotlinify-ts/coroutines';
// WebSocket with auto-reconnect - actually readable
const socketFlow = callbackFlow<Message>(scope => {
const ws = new WebSocket('ws://localhost:8080');
ws.onmessage = (e) => scope.emit(JSON.parse(e.data));
ws.onerror = () => ws.close();
ws.onclose = () => scope.close();
scope.onClose(() => ws.close());
});
// Auto-retry with backoff
const reliableSocket = socketFlow.retryWhen((error, attempt) => {
if (attempt < 5) {
console.log(`Retry #${attempt}: ${error.message}`);
return delay(1000 * attempt).then(() => true);
}
return false;
});
// Use it - no PhD required
await reliableSocket
.filter(msg => msg.type === 'update')
.map(msg => msg.data)
.collect(data => updateUI(data));
// That's it. No marble diagrams. No subscription management.
// Just data flowing through transformations.RxJS Complexity
- • 200+ operators to learn
- • Subscription management
- • Scheduler confusion
- • Memory leak footguns
Manual Event Handling
- • Callback hell returns
- • No composition
- • Manual cleanup everywhere
- • State synchronization bugs
Flow Balance
- • 30 intuitive operators
- • Auto cleanup with scopes
- • Built-in backpressure
- • StateFlow for UI state
Real-World Flow Use Cases
UI State Management
StateFlow replaces Redux/MobX with 10× less boilerplate. One source of truth, automatic updates.
WebSocket Streams
Handle real-time data with automatic reconnection, buffering, and error recovery.
Event Processing
Debounce searches, throttle scrolls, combine user inputs - without the complexity.
Data Pipelines
Transform API responses through multiple stages with backpressure handling.
Creating Flows
Multiple ways to create flows from values, generators, arrays, or callbacks.
import { flow, flowOf, asFlow, callbackFlow } from 'kotlinify-ts/flow';
// From values
await flowOf(1, 2, 3, 4, 5)
.map(x => x * 2)
.filter(x => x > 5)
.collect(console.log); // 6, 8, 10
// From async generator
const timerFlow = flow(async function* () {
for (let i = 0; i < 5; i++) {
yield i;
await new Promise(r => setTimeout(r, 1000));
}
});
await timerFlow
.map(x => `Tick ${x}`)
.collect(console.log); // Tick 0, Tick 1, ...
// From arrays or iterables
await asFlow([1, 2, 3, 4, 5])
.filter(x => x % 2 === 0)
.collect(console.log); // 2, 4
// From callbacks (for event streams)
const clickFlow = callbackFlow<MouseEvent>(scope => {
const handler = (e: MouseEvent) => scope.emit(e);
document.addEventListener('click', handler);
scope.onClose(() => {
document.removeEventListener('click', handler);
});
});
await clickFlow
.map(e => ({ x: e.clientX, y: e.clientY }))
.collect(pos => console.log(`Clicked at ${pos.x}, ${pos.y}`));Transformation Operators
Rich set of operators for transforming, filtering, and controlling flow emissions.
import { flowOf } from 'kotlinify-ts/flow';
// Basic transformations
await flowOf(1, 2, 3, 4, 5)
.map(x => x * x) // Square each value
.filter(x => x > 10) // Keep only > 10
.take(2) // Take first 2
.collect(console.log); // 16, 25
// Distinct and debouncing
await userInputFlow()
.debounce(300) // Wait 300ms of silence before emitting
.distinctUntilChanged() // Skip consecutive duplicates
.retryWhen((error, attempt) => attempt < 3) // Resilient retries
.map(query => searchAPI(query))
.collect(results => displayResults(results));
// Throttling for rate limiting
await scrollEventFlow()
.throttle(100) // Max 1 emission per 100ms
.map(e => window.scrollY)
.distinctUntilChanged()
.collect(position => updateScrollIndicator(position));
// Sampling periodic snapshots
await sensorDataFlow()
.sample(1000) // Emit latest value every second
.map(reading => reading.temperature)
.collect(temp => updateDisplay(temp));
// Buffering for batch processing
await dataStream()
.buffer(100) // Process in batches of 100
.map(batch => processBatch(batch))
.collect(results => saveResults(results));Try it yourself:
// Basic flow transformations
await flowOf(1, 2, 3, 4, 5, 6)
.map(x => x * 2)
.filter(x => x > 5)
.take(3)
.collect(x => console.log('Value:', x));
console.log('---');
// Create a simple flow
const numbers = flowOf(10, 20, 30, 40);
await numbers.collect(n => console.log('Number:', n));Advanced Transformations
Powerful operators for complex flow transformations and flattening strategies.
import { flowOf, flow } from 'kotlinify-ts/flow';
// Custom transformation with multiple emissions
await flowOf(1, 2, 3)
.transform(async (value, emit) => {
await emit(value);
await emit(value * 10);
await emit(value * 100);
})
.collect(console.log); // 1, 10, 100, 2, 20, 200, 3, 30, 300
// Scan for running calculations
await flowOf(1, 2, 3, 4, 5)
.scan(0, (sum, value) => sum + value)
.collect(console.log); // 0, 1, 3, 6, 10, 15
// Add index to values
await flowOf('a', 'b', 'c')
.withIndex()
.collect(([index, value]) =>
console.log(`${index}: ${value}`)
); // 0: a, 1: b, 2: c
// FlatMap strategies
const searchFlow = (query: string) => flow(async function* () {
const results = await searchAPI(query);
for (const result of results) yield result;
});
// flatMapConcat: Process sequentially
await flowOf('kotlin', 'typescript', 'rust')
.flatMapConcat(query => searchFlow(query))
.collect(console.log); // All kotlin results, then typescript, then rust
// flatMapMerge: Process in parallel (with concurrency limit)
await flowOf('kotlin', 'typescript', 'rust')
.flatMapMerge(2, query => searchFlow(query))
.collect(console.log); // Results arrive as they complete, max 2 concurrent
// flatMapLatest: Cancel previous on new emission
await userInputFlow()
.debounce(300) // Wait for idle input before switching requests
.flatMapLatest(query => searchFlow(query))
.collect(displayResults); // Cancels previous search on new inputStateFlow - Reactive State Management
StateFlow holds a single value and emits updates to all collectors. Perfect for UI state management.
import { MutableStateFlow } from 'kotlinify-ts/flow';
// Create reactive state
const counter = new MutableStateFlow(0);
const user = new MutableStateFlow<User | null>(null);
const theme = new MutableStateFlow<'light' | 'dark'>('light');
// Read current value synchronously
console.log(counter.value); // 0
// Update state
counter.value = 5;
counter.update(current => current + 1); // Functional update
user.value = { id: 1, name: 'Alice' };
// Compare and set atomically
const wasUpdated = counter.compareAndSet(6, 10);
console.log(wasUpdated); // true if was 6, false otherwise
// Subscribe to changes (always emits current value first)
await counter.collect(count => {
console.log(`Count: ${count}`);
}); // Immediately emits 10, then future updates
// Combine multiple states
const appState = new MutableStateFlow({
user: null as User | null,
theme: 'light' as Theme,
notifications: [] as Notification[]
});
// React component example
function useFlowState<T>(flow: StateFlow<T>) {
const [value, setValue] = useState(flow.value);
useEffect(() => {
const job = launch(() =>
flow.collect(setValue)
);
return () => job.cancel();
}, [flow]);
return value;
}
// WebSocket connection state
const connectionState = new MutableStateFlow<
'disconnected' | 'connecting' | 'connected' | 'error'
>('disconnected');
websocket.onopen = () => connectionState.value = 'connected';
websocket.onerror = () => connectionState.value = 'error';
websocket.onclose = () => connectionState.value = 'disconnected';
await connectionState
.distinctUntilChanged()
.collect(state => updateConnectionUI(state));SharedFlow - Event Broadcasting
SharedFlow broadcasts values to multiple collectors without state. Configurable replay and buffering.
import { MutableSharedFlow, SharedFlow } from 'kotlinify-ts/flow';
// Basic event bus
const events = new MutableSharedFlow<AppEvent>();
// With replay cache (new subscribers get last N values)
const messages = new MutableSharedFlow<Message>({
replay: 5 // New collectors get last 5 messages
});
// With buffer for handling slow collectors
const dataStream = new MutableSharedFlow<Data>({
replay: 0,
extraBufferCapacity: 64,
onBufferOverflow: 'DROP_OLDEST' // or 'DROP_LATEST', 'SUSPEND'
});
// Emit events
await events.emit({ type: 'user-login', userId: 123 });
events.tryEmit({ type: 'page-view', path: '/home' }); // Non-suspending
// Multiple collectors
await Promise.all([
events.collect(e => logEvent(e)),
events.collect(e => sendAnalytics(e)),
events.collect(e => updateUI(e))
]);
// Check active subscriptions
console.log(events.subscriptionCount); // 3
// Access replay cache
console.log(messages.replayCache); // Array of last N messages
// Clean up
events.resetReplayCache();
events.cancelAll(); // Cancel all collectors
// Real-world: WebSocket message distribution
const wsMessages = new MutableSharedFlow<WSMessage>({ replay: 1 });
websocket.onmessage = (e) => {
const message = JSON.parse(e.data);
wsMessages.tryEmit(message);
};
// Different components subscribe to different message types
await wsMessages
.filter(msg => msg.type === 'chat')
.collect(msg => displayChatMessage(msg));
await wsMessages
.filter(msg => msg.type === 'notification')
.collect(msg => showNotification(msg));Combining Flows
Combine multiple flows to create complex reactive data pipelines.
import { combine, zip, merge, flowOf } from 'kotlinify-ts/flow';
// Combine: Emits when any flow emits (after all have emitted once)
const userFlow = new MutableStateFlow(currentUser);
const settingsFlow = new MutableStateFlow(currentSettings);
const themeFlow = new MutableStateFlow(currentTheme);
await combine(
userFlow,
settingsFlow,
(user, settings) => ({ user, settings })
)
.collect(state => renderApp(state));
// Emits whenever user OR settings change
// Zip: Pairs values from flows
const questionsFlow = flowOf('Name?', 'Age?', 'City?');
const answersFlow = flowOf('Alice', '25', 'NYC');
await zip(
questionsFlow,
answersFlow,
(q, a) => `${q} ${a}`
)
.collect(console.log);
// "Name? Alice", "Age? 25", "City? NYC"
// Merge: Combine emissions from multiple flows
const mouseFlow = callbackFlow(/* mouse events */);
const keyboardFlow = callbackFlow(/* keyboard events */);
const touchFlow = callbackFlow(/* touch events */);
await merge(mouseFlow, keyboardFlow, touchFlow)
.throttle(50)
.collect(event => handleUserInput(event));
// Real-world: Form validation
const emailFlow = new MutableStateFlow('');
const passwordFlow = new MutableStateFlow('');
await combine(
emailFlow,
passwordFlow,
(email, password) => ({
email,
password,
isValid: email.includes('@') && password.length >= 8
})
)
.distinctUntilChangedBy(state => state.isValid)
.collect(state => {
submitButton.disabled = !state.isValid;
});Terminal Operations
Collect flow values into different forms or compute aggregations.
import { flowOf } from 'kotlinify-ts/flow';
// Collect to collections
const array = await flowOf(1, 2, 3).toArray(); // [1, 2, 3]
const list = await flowOf(1, 2, 3).toList(); // [1, 2, 3]
const set = await flowOf(1, 1, 2, 3).toSet(); // Set{1, 2, 3}
// First and last
const first = await flowOf(1, 2, 3).first(); // 1
const last = await flowOf(1, 2, 3).last(); // 3
const firstOrNull = await flowOf<number>().firstOrNull(); // null
// Single value (throws if not exactly one)
const single = await flowOf(42).single(); // 42
const singleOrNull = await flowOf(1, 2).singleOrNull(); // null
// Aggregations
const sum = await flowOf(1, 2, 3, 4)
.reduce((a, b) => a + b); // 10
const product = await flowOf(1, 2, 3, 4)
.fold(1, (acc, value) => acc * value); // 24
const count = await flowOf(1, 2, 3).count(); // 3
// Predicates
const hasEven = await flowOf(1, 3, 5, 6)
.any(x => x % 2 === 0); // true
const allPositive = await flowOf(1, 2, 3)
.all(x => x > 0); // true
const noneNegative = await flowOf(1, 2, 3)
.none(x => x < 0); // true
// Real-world: API response validation
const isValid = await apiResponseFlow()
.map(response => response.data)
.all(data => data.status === 'success');
const errorCount = await logFlow()
.filter(log => log.level === 'ERROR')
.count();Error Handling & Resilience
Handle errors gracefully with retry strategies and fallback mechanisms.
import { flow, flowOf } from 'kotlinify-ts/flow';
// Catch and handle errors
await flow(async function* () {
yield 1;
yield 2;
throw new Error('Something went wrong');
yield 3; // Never reached
})
.catch(error => console.error('Caught:', error.message))
.collect(console.log); // 1, 2, then error logged
// Retry with fixed attempts
await fetchDataFlow()
.retry(3) // Retry up to 3 times on error
.collect(data => processData(data));
// Retry with custom logic
await apiFlow()
.retryWhen((error, attempt) => {
if (error.code === 'RATE_LIMIT') {
return attempt < 5; // Retry rate limits up to 5 times
}
return false; // Don't retry other errors
})
.collect(response => handleResponse(response));
// Default values for empty flows
await possiblyEmptyFlow()
.defaultIfEmpty({ id: 0, name: 'Default' })
.collect(console.log); // Emits default if flow is empty
// Handle empty flows
await searchFlow(query)
.onEmpty(() => console.log('No results found'))
.collect(result => displayResult(result));
// Lifecycle hooks for cleanup
await resourceFlow()
.onStart(() => console.log('Starting flow'))
.onEach(value => console.log('Processing:', value))
.onCompletion(() => console.log('Flow completed'))
.catch(error => console.error('Flow error:', error))
.collect(value => processValue(value));
// Real-world: Resilient API polling
const pollAPI = () => flow(async function* () {
while (true) {
try {
const data = await fetchAPI();
yield data;
} catch (error) {
console.error('Poll failed:', error);
yield null; // Emit null on error
}
await new Promise(r => setTimeout(r, 5000));
}
});
await pollAPI()
.filter(data => data !== null)
.retryWhen((error, attempt) => {
const delay = Math.min(1000 * Math.pow(2, attempt), 30000);
return new Promise(resolve =>
setTimeout(() => resolve(attempt < 10), delay)
);
})
.collect(data => updateDashboard(data));Real-World Examples
Practical patterns for common reactive programming scenarios.
import { MutableStateFlow, MutableSharedFlow, callbackFlow, flow, combine } from 'kotlinify-ts/flow';
// 1. Search with debounce and cancellation
const searchInput = new MutableStateFlow('');
searchInput
.debounce(300) // Wait for typing to pause
.filter(query => query.length > 2)
.distinctUntilChanged()
.flatMapLatest(query =>
flow(async function* () {
const results = await searchAPI(query);
yield* results;
})
)
.catch(error => {
console.error('Search failed:', error);
return [];
})
.collect(results => renderSearchResults(results));
// 2. WebSocket reconnection with exponential backoff
const createWebSocketFlow = () => callbackFlow<MessageEvent>(scope => {
let ws: WebSocket;
let reconnectDelay = 1000;
const connect = () => {
ws = new WebSocket('wss://api.example.com');
ws.onmessage = (e) => scope.emit(e);
ws.onopen = () => {
console.log('Connected');
reconnectDelay = 1000; // Reset delay
};
ws.onerror = (e) => console.error('WebSocket error:', e);
ws.onclose = () => {
if (scope.isActive) {
setTimeout(connect, reconnectDelay);
reconnectDelay = Math.min(reconnectDelay * 2, 30000);
}
};
};
connect();
scope.onClose(() => ws?.close());
});
await createWebSocketFlow()
.map(e => JSON.parse(e.data))
.filter(msg => msg.type === 'data')
.collect(data => processRealtimeData(data));
// 3. Infinite scroll with pagination
const scrollFlow = callbackFlow<number>(scope => {
const handler = () => {
const { scrollTop, scrollHeight, clientHeight } = document.documentElement;
if (scrollTop + clientHeight >= scrollHeight - 100) {
scope.emit(Math.floor(scrollTop / 1000)); // Page number
}
};
window.addEventListener('scroll', handler);
scope.onClose(() => window.removeEventListener('scroll', handler));
});
const currentPage = new MutableStateFlow(0);
await scrollFlow
.distinctUntilChanged()
.collect(page => {
currentPage.value = page;
loadMoreContent(page);
});
// 4. Real-time collaborative editing
interface Edit { userId: string; changes: any; timestamp: number; }
const localEdits = new MutableSharedFlow<Edit>();
const remoteEdits = new MutableSharedFlow<Edit>();
const documentState = new MutableStateFlow(initialDocument);
await merge(localEdits, remoteEdits)
.scan(initialDocument, (doc, edit) => applyEdit(doc, edit))
.distinctUntilChanged()
.collect(doc => {
documentState.value = doc;
renderDocument(doc);
});
// 5. System monitoring dashboard
const cpuFlow = flow(async function* () {
while (true) {
yield await getCPUUsage();
await new Promise(r => setTimeout(r, 1000));
}
});
const memoryFlow = flow(async function* () {
while (true) {
yield await getMemoryUsage();
await new Promise(r => setTimeout(r, 1000));
}
});
await combine(
cpuFlow.throttle(5000),
memoryFlow.throttle(5000),
(cpu, memory) => ({ cpu, memory, timestamp: Date.now() })
)
.scan([], (history, stats) =>
[...history.slice(-59), stats].slice(-60) // Keep last minute
)
.collect(history => renderChart(history));Best Practices
Guidelines for effective Flow usage in production applications.
✓ Do
- • Use StateFlow for UI state that needs current value access
- • Use SharedFlow for events without state
- • Apply backpressure strategies (buffer, conflate, sample) for high-frequency streams
- • Clean up with onClose() in callbackFlow
- • Use distinctUntilChanged() to avoid unnecessary updates
- • Prefer cold flows for one-time operations
✗ Don't
- • Don't forget to handle errors with catch()
- • Don't create flows in tight loops
- • Don't ignore backpressure in high-frequency streams
- • Don't forget to cancel collectors when done
- • Don't use SharedFlow when you need the current value