約束を超えて:WebStreams APIによるストリーム処理
Translating with AI…
Translating with AI…
配列のようにデータを処理します。Promiseのように非同期を扱います。ストリームのようにスケールします。
JavaScriptには非同期データを処理するための3つの方法があります:
ほとんどのコードは最初の2つを使用します。しかし、ストリームには真の力があります。
問題は?ストリームAPIは冗長です。RxJSは複雑です。私はもっとシンプルなものを求めていました。
const readable = new ReadableStream({
async pull(controller) {
const data = await fetchNextChunk();
if (data) {
controller.enqueue(data);
} else {
controller.close();
}
}
});
const transformed = readable.pipeThrough(
new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
})
);
強力ですが冗長です。シンプルな変換には多すぎるボイラープレート。
import { from, mergeMap, filter, toArray } from 'rxjs';
from([1, 2, 3, 4, 5]).pipe(
mergeMap(id => from(fetch(`/api/${id}`).then(r => r.json()))),
filter(user => user.isActive),
toArray()
).subscribe(console.log);
100以上の演算子。ホット vs コールド オブザーバブル。サブスクリプション管理。急な学習曲線。
sflowはストリームで動作する配列メソッドを提供します:
import { sflow } from "sflow";
await sflow([1, 2, 3, 4, 5])
.map(async id => fetch(`/api/${id}`).then(r => r.json()))
.filter(user => user.isActive)
.toArray();
同じ結果。馴染みのあるAPI。WebStreamsの上に構築されています。
すべてのメソッドは非同期関数を受け入れます:
sflow(urls)
.map(async url => {
const res = await fetch(url);
return res.json();
})
.filter(async data => {
return await validate(data);
})
.toArray();
特別な非同期演算子は必要ありません。非同期関数だけです。
メソッドはsflow<T>を返し、流暢なチェーンを可能にします:
sflow(data)
.filter(x => x > 0)
.map(x => x * 2)
.chunk(10)
.throttle(1000)
.toArray();
WebStreamsはプルベースです。消費者が準備できたときにデータが流れます。バッファオーバーフローなし。アイテムがドロップされることはありません。
// これは数百万のアイテムを持っていてもメモリを圧倒しません
await sflow(hugeDataset)
.map(process)
.forEach(save)
.run();
// Map: 各アイテムを変換
sflow([1, 2, 3]).map(x => x * 2); // [2, 4, 6]
// Filter: 一致するアイテムを保持
sflow([1, 2, 3, 4]).filter(x => x % 2 === 0); // [2, 4]
// FlatMap: マッピングしてフラット化
sflow([[1, 2], [3, 4]]).flatMap(x => x); // [1, 2, 3, 4]
// Reduce: 単一の値に累積
sflow([1, 2, 3]).reduce((a, b) => a + b, 0); // 6
// 固定サイズのチャンク
sflow([1, 2, 3, 4, 5]).chunk(2); // [[1, 2], [3, 4], [5]]
// 時間ベースのチャンク
sflow(eventStream).chunkInterval(1000); // 毎秒発行
// 条件付きチャンク
sflow(lines).chunkIf(line => line === ""); // 空白行で分割
// Throttle: インターバルごとに最大1
sflow(events).throttle(100); // 最大10/秒
// Debounce: サイレンスを待つ
sflow(keystrokes).debounce(300); // 最後のキー入力後300ms待つ
// 制限付きの同時非同期操作
await sflow(urls)
.pMap(
async url => fetch(url).then(r => r.json()),
{ concurrency: 5 }
)
.toArray();
// すべてを収集
const array = await sflow(data).toArray();
// 最初を取得
const first = await sflow(data).toFirst();
// 最後を取得
const last = await sflow(data).toLast();
// カウント
const count = await sflow(data).toCount();
// 実行するだけ(副作用のために)
await sflow(data).forEach(save).run();
async function* fetchAllPages() {
let page = 1;
while (true) {
const data = await fetch(`/api/items?page=${page}`).then(r => r.json());
if (data.length === 0) break;
yield* data;
page++;
}
}
const allItems = await sflow(fetchAllPages())
.filter(item => item.isValid)
.toArray();
await sflow(logFileStream)
.lines()
.filter(line => line.includes("ERROR"))
.map(line => parseLogLine(line))
.chunk(100)
.forEach(async batch => {
await sendToMonitoring(batch);
})
.run();
const records = await sflow(csvText)
.lines()
.skip(1) // ヘッダーをスキップ
.map(line => {
const [name, age, email] = line.split(",");
return { name, age: parseInt(age), email };
})
.filter(record => record.age >= 18)
.toArray();
await sflow(userIds)
.chunk(50) // 50のバッチ
.throttle(1000) // 1秒ごとに1バッチ
.map(async batch => {
return fetch("/api/users/batch", {
method: "POST",
body: JSON.stringify(batch)
}).then(r => r.json());
})
.flatMap(results => results)
.forEach(user => updateUI(user))
.run();
// MongoDBスタイルのアンワインド
await sflow([
{ id: 1, tags: ["a", "b"] },
{ id: 2, tags: ["c"] }
])
.unwind("tags")
.toArray();
// [{ id: 1, tags: "a" }, { id: 1, tags: "b" }, { id: 2, tags: "c" }]
// 計算フィールドを追加
await sflow(users)
.mapAddField("fullName", u => `${u.first} ${u.last}`)
.toArray();
| アスペクト | sflow | RxJS |
|---|---|---|
| 学習曲線 | 低(配列に似ている) | 高(リアクティブパラダイム) |
| バンドルサイズ | 約10KB | 約40KB |
| 非同期処理 | ネイティブ | 演算子が必要 |
| バックプレッシャー | 組み込み(WebStreams) | 手動 |
| 使用例 | データパイプライン | リアクティブUI |
sflowを選ぶのは、以下のとき:
RxJSを選ぶのは、以下のとき:
npm install sflow
import { sflow } from "sflow";
const result = await sflow(data)
.map(transform)
.filter(validate)
.toArray();
ストリームは配列よりも難しくあるべきではありません。
WebStreams APIは強力ですが低レベルです。RxJSは包括的ですが複雑です。sflowはその中間に位置します:配列メソッドのシンプルさとストリーミングの力。
もし配列に対して.map()や.filter()が使えるなら、sflowも使えます。
インストール: npm install sflow
GitHub: github.com/snomiao/sflow
Snowstar Miaoは現代JavaScriptのためのストリーム処理ツールを作成しています。