Process data like arrays. Handle async like promises. Scale like streams.
JavaScript has three ways to handle async data:
Most code uses the first two. But streams are where the real power is.
The problem? The Streams API is verbose. RxJS is complex. I wanted something simpler.
const readable = new ReadableStream({
async pull(controller) {
const data = await fetchNextChunk();
if (data) {
controller.enqueue(data);
} else {
controller.close();
}
}
});
const transformed = readable.pipeThrough(
new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
})
);
Powerful but verbose. Too much boilerplate for simple transformations.
import { from, mergeMap, filter, toArray } from 'rxjs';
from([1, 2, 3, 4, 5]).pipe(
mergeMap(id => from(fetch(`/api/${id}`).then(r => r.json()))),
filter(user => user.isActive),
toArray()
).subscribe(console.log);
100+ operators. Hot vs cold observables. Subscription management. Steep learning curve.
sflow gives you array methods that work on streams:
import { sflow } from "sflow";
await sflow([1, 2, 3, 4, 5])
.map(async id => fetch(`/api/${id}`).then(r => r.json()))
.filter(user => user.isActive)
.toArray();
Same result. Familiar API. Built on WebStreams.
Every method accepts async functions:
sflow(urls)
.map(async url => {
const res = await fetch(url);
return res.json();
})
.filter(async data => {
return await validate(data);
})
.toArray();
No special async operators. Just async functions.
Methods return sflow<T>, enabling fluent chains:
sflow(data)
.filter(x => x > 0)
.map(x => x * 2)
.chunk(10)
.throttle(1000)
.toArray();
WebStreams are pull-based. Data flows when the consumer is ready. No buffer overflow. No dropped items.
// This won't overwhelm memory even with millions of items
await sflow(hugeDataset)
.map(process)
.forEach(save)
.run();
// Map: transform each item
sflow([1, 2, 3]).map(x => x * 2); // [2, 4, 6]
// Filter: keep matching items
sflow([1, 2, 3, 4]).filter(x => x % 2 === 0); // [2, 4]
// FlatMap: map and flatten
sflow([[1, 2], [3, 4]]).flatMap(x => x); // [1, 2, 3, 4]
// Reduce: accumulate to single value
sflow([1, 2, 3]).reduce((a, b) => a + b, 0); // 6
// Fixed-size chunks
sflow([1, 2, 3, 4, 5]).chunk(2); // [[1, 2], [3, 4], [5]]
// Time-based chunks
sflow(eventStream).chunkInterval(1000); // Emit every second
// Conditional chunks
sflow(lines).chunkIf(line => line === ""); // Split on empty lines
// Throttle: max one per interval
sflow(events).throttle(100); // Max 10 per second
// Debounce: wait for silence
sflow(keystrokes).debounce(300); // Wait 300ms after last keystroke
// Concurrent async operations with limit
await sflow(urls)
.pMap(
async url => fetch(url).then(r => r.json()),
{ concurrency: 5 }
)
.toArray();
// Collect all
const array = await sflow(data).toArray();
// Get first
const first = await sflow(data).toFirst();
// Get last
const last = await sflow(data).toLast();
// Count
const count = await sflow(data).toCount();
// Just run (for side effects)
await sflow(data).forEach(save).run();
async function* fetchAllPages() {
let page = 1;
while (true) {
const data = await fetch(`/api/items?page=${page}`).then(r => r.json());
if (data.length === 0) break;
yield* data;
page++;
}
}
const allItems = await sflow(fetchAllPages())
.filter(item => item.isValid)
.toArray();
await sflow(logFileStream)
.lines()
.filter(line => line.includes("ERROR"))
.map(line => parseLogLine(line))
.chunk(100)
.forEach(async batch => {
await sendToMonitoring(batch);
})
.run();
const records = await sflow(csvText)
.lines()
.skip(1) // Skip header
.map(line => {
const [name, age, email] = line.split(",");
return { name, age: parseInt(age), email };
})
.filter(record => record.age >= 18)
.toArray();
await sflow(userIds)
.chunk(50) // Batch of 50
.throttle(1000) // One batch per second
.map(async batch => {
return fetch("/api/users/batch", {
method: "POST",
body: JSON.stringify(batch)
}).then(r => r.json());
})
.flatMap(results => results)
.forEach(user => updateUI(user))
.run();
// MongoDB-style unwind
await sflow([
{ id: 1, tags: ["a", "b"] },
{ id: 2, tags: ["c"] }
])
.unwind("tags")
.toArray();
// [{ id: 1, tags: "a" }, { id: 1, tags: "b" }, { id: 2, tags: "c" }]
// Add computed field
await sflow(users)
.mapAddField("fullName", u => `${u.first} ${u.last}`)
.toArray();
| Aspect | sflow | RxJS |
|---|---|---|
| Learning curve | Low (array-like) | High (reactive paradigm) |
| Bundle size | ~10KB | ~40KB |
| Async handling | Native | Requires operators |
| Backpressure | Built-in (WebStreams) | Manual |
| Use case | Data pipelines | Reactive UIs |
Choose sflow when:
Choose RxJS when:
npm install sflow
import { sflow } from "sflow";
const result = await sflow(data)
.map(transform)
.filter(validate)
.toArray();
Streams shouldn't be harder than arrays.
The WebStreams API is powerful but low-level. RxJS is comprehensive but complex. sflow sits in between: the simplicity of array methods with the power of streaming.
If you can use .map() and .filter() on arrays, you can use sflow.
Install: npm install sflow
GitHub: github.com/snomiao/sflow
Demo: sflow-examples.vercel.app
Snowstar Miao builds stream processing tools for modern JavaScript.