글 목록으로 이동

봄가을 블로그

기술2021년 12월 07일--views

Node.js 스트림(stream) 개념을 익혀보자

Stream 자료구조를 간단하게 익히고 Readable 클래스를 직접 확장하여 구현해봅니다

목표

  • Stream 에 대한 개념 이해
  • 간단하게 Readable 클래스 확장구현하기

사전지식

  • Node.js (본 글에 있는 코든 코드는 Node.js 에서 실행합니다. )
  • 자바스크립트 ES6 문법
  • 클래스 확장/상속

이 글에서 다루지 않는 것

  • Stream 의 다양한 이벤트
  • Readable, Writable 생성자에 넘기는 옵션
  • pipe 함수의 정확한 동작 방식
  • Writable, Duplex, Transform 직접 만들기

개요

스트림이라는 말을 처음 들어봤을 수도 있겠지요? 하지만 이미 우리의 일상 속에 깊이 자리잡고 있는 녀석입니다. console.log 가 우리의 터미널로 출력시킬 때 이용하는 process.stdout 또한 Stream 객체입니다. 그런데 스트림은 어떤 개념이고 어떻게 써야 잘 쓰는 걸까요?

스트림은 무엇인가?

위키백과 왈, 시간이 지남에 따라 사용할 수 있게 되는 일련의 데이터 요소라고 합니다. 필자도 이게 무슨 소리인지 잘 모르겠으므로 그냥 필자가 이해한대로 나불대려고 합니다.

스트림은 "순차적인 데이터" 입니다. 보통 우리가 생각하는 데이터라는 것은 내용이 있겠고, 그 내용 만큼 크기가 결정될 겁니다. 하지만 순차적인 데이터란 크기가 중요하지 않습니다. 데이터가 그냥 흘러갈 뿐입니다. 이 말이 무슨 말이냐? 흘러가는 데이터가 뭐가 있을까요? 그리고 데이터를 왜 흘러가도록 할까요?

스트림을 왜 쓰는가? 어떤 특징이 있는가?

데이터를 흘러가도록 하는 이유는 메모리를 아끼기 위해서입니다. 만약 100GB 단위의 데이터를 압축해서 zip 파일을 만들어야 한다고 가정해봅시다. 우리는 일단 데이터를 처리하기 위해 SSD 상에서의 데이터를 메모리로 로딩해야 합니다. 하지만 우리의 메모리는 작으면 8GB, 보통이면 16GB, 많으면 32GB 쯤 밖에 하지 않겠죠. 여기에는 100GB 라는 큰 데이터를 모두 담을 수 없습니다. 그럼 우리는 메모리의 최대 용량까지만 파일을 압축할 수 있을까요?

그렇지 않습니다. 왜냐하면 압축하는 프로그램은 100GB 의 큰 데이터를 전부 읽지 않아도 앞에서부터 차례대로 압축을 수행할 수 있습니다. 우리의 똑똑한 압축 프로그램은 커다란 파일의 앞에서부터 차례대로 조금조금씩 데이터를 압축하는 방법을 알고 있습니다. 처음부터 모든 데이터를 알 필요 없이 끝날 때까지 차례대로 처리해나가면 되는 것이지요.

또 다른 예시로는, 특성상 어쩔 수 없이 흘러가는 데이터로 처리해야 할 경우가 있습니다. 우리가 블로그를 운영한다고 가정합시다. 블로그 웹서버는 클라이언트로부터 요청이 들어오면 글을 다시 클라이언트에게 보냅니다. 여기서 그 요청이라는 데이터는 총량이 정해지지 않습니다. 정해질 수 없습니다. 왜냐하면 계속 사람들이 들어올 거니까요. 계속해서요. 끝없이. 그러니까 데이터의 총량과는 상관없이 요청 하나 들어오면 그 하나에 대해 제대로 처리하는 게 중요한 겁니다.

이처럼 스트림은 데이터가 끝이 날 수도 있고 안날 수도 있습니다. 앞서 이야기했던 사례 중 파일을 압축하는 사례는 파일의 끝이 명백하게 정해져 있습니다. 그 끝이 언제일지는 모르지만 아무튼 끝이라는 게 있기는 한 것이지요. 그 끝까지 압축을 잘 해야 압축을 했다! 라고 말할 수 있습니다.

하지만 웹서버의 경우에는 조금 다릅니다. 요청이란 것은 블로그를 폐쇄하기 전까지는 (죽기 전까지는) 계속 받을 수 있어야 합니다. 마치 숨쉬는 것처럼요. 세상이 종말할 때에야 끝은 나겠지요. 웹서버는 그냥 켜져 있는 상태, 외부로부터 요청을 받아들일 수 있는 상태가 중요한 겁니다.

일반적으로 스트림에는 읽기 전용 스트림과 쓰기 전용 스트림이 있습니다. Node.js 에서는 읽기 전용을 Readable, 쓰기 전용을 Writable 클래스를 활용하여 구현하고 있지요. 왜 이렇게 두 개로 나뉘어 놨냐구요? 뭐 저도 잘은 모르겠지만 좀 더 역할을 명확하게 주기 위해서이지 않을까 싶어요. 스트림을 사용하는 개발자 입장에선는 읽을 수 있는(Readable) 스트림이란 데이터가 스트림 내부에서 외부로 빠져나올 수 있다는 뜻이고, 쓸 수 있는(Writable) 스트림이란 갖고 있던 내이터를 스트림 내부로 전달시킨다는 뜻이겠지요. 이게 무슨 말이냐구요?

우리가 압축 프로그램을 만드는 개발자라고 가정을 해봐요. 100GB 짜리 파일을 압축시키려면 일단 파일을 읽기 시작해야 합니다. 그 다음 적절한 압축 알고리즘을 이용해 데이터를 좀 줄이고, 새로운 파일에 데이터를 차곡차곡 쌓아 나가야겠죠?

이미 있는 스트림들

process.stdout

console.log 가 기본적으로 사용하는 process.stdout은 스트림이라고 했습니다. 그럼 그 스트림을 직접 다뤄볼까요?


process.stdout.write("Hello, world!\\n");
process.stdout.write("Hello, world! 2\\n");

아래는 그 결과입니다.


Hello, world!
Hello, world! 2

정말 간단한 예시이죠?

스트림은 순차적인 데이터라고 했습니다. 그래서 뭔가 데이터가 전달되어도 그게 끝이 아닐 수 있다는 것을 스트림 본인 스스로는 너무 잘 압니다. 그래서 write 한 번 했다고 해서 끝나지 않습니다. 우리는 계속해서 write 메소드를 호출할 수 있습니다. 위에서는 두 번 밖에 안했지만 이론상 무제한 write 할 수 있는 겁니다.

process.stdout 는 Writable 스트림입니다. 쓸 수 있는 스트림이니까 쓸 수 있어야겠지요? 그래서 write 메소드가 있습니다. 간단하게 써 봤습니다.

Writable 스트림이 무엇인지는 조금 있다가 더 자세하게 설명하고, 또 다른 Writable 스트림을 살펴볼까요?

createWriteStream


const { createWriteStream } = require("fs");
const stream = createWriteStream("output.txt");
stream.write("hey");
stream.write("stack");

output.txt 파일이 생겼을 겁니다.

createWriteStream 함수는 파일을 작성할 수 있는 Writable 스트림을 만들어서 반환합니다. 마찬가지로 write 메소드를 사용할 수 있습니다. 물론 여러 번 사용할 수 있습니다.

우리는 무한정 데이터를 주고 싶을 때도 있겠지만, 그렇지 않을 때도 있을 겁니다. 끝을 내고 싶다 이 말입니다! 그럴 때에는 end 메소드를 사용할 수 있습니다.


process.stdout.write("Hello, world!\\n");
process.stdout.end();
process.stdout.write("ho");

위 코드를 실행시키면 아래와 같은 결과가 나옵니다.


Hello, world!
events.js:377
throw er; // Unhandled 'error' event
^
Error [ERR_STREAM_WRITE_AFTER_END]: write after end
at WriteStream.Writable.write (internal/streams/writable.js:292:11)
at Object.<anonymous> (/Users/th.kim/Desktop/playground-js/stream.js:3:16)
at Module._compile (internal/modules/cjs/loader.js:1072:14)
at Object.Module._extensions..js (internal/modules/cjs/loader.js:1101:10)
at Module.load (internal/modules/cjs/loader.js:937:32)
at Function.Module._load (internal/modules/cjs/loader.js:778:12)
at Function.executeUserEntryPoint [as runMain] (internal/modules/run_main.js:76:12)
at internal/main/run_main_module.js:17:47
Emitted 'error' event on WriteStream instance at:
at WriteStream.onerror (internal/streams/readable.js:745:14)
at WriteStream.emit (events.js:400:28)
at emitErrorNT (internal/streams/destroy.js:106:8)
at processTicksAndRejections (internal/process/task_queues.js:82:21) {
code: 'ERR_STREAM_WRITE_AFTER_END'
}

성대한 오류가 발생했죠? Hello, world! 까지는 제대로 출력됐는데, 그 다음에 뭔가 에러가 떴네요. 그렇습니다. end 호출을 통해 스트림이 끝나고 나면 write 를 수행할 수 없습니다. 끝나면 그냥 끝난 거예요. 다시 살릴 수가 없어요.

createReadStream

createWriteStream 을 써봤으니 이제 createReadStream 을 써봐야겠죠? ReadStream 은 파일을 읽을 때 사용합니다.

우선 아래 내용으로 input.txt 파일을 만들어봅시다. 파일 위치는 코드 파일과 똑같은 폴더에 넣습니다.


123456789

그리고 아래와 같이 코드를 작성합니다.


const { createReadStream } = require("fs");
const stream = createReadStream("./input.txt");
stream.on("data", (chunk) => {
console.log(`chunk: ${chunk}`);
});

이제 이 코드를 실행시키면?


chunk: 123456789

스트림을 이용하는 법이 살짝 달라졌죠? on 메소드를 이용하여 이벤트 핸들러를 등록시킵니다. 이 이벤트의 이름은 data 이고, 우리는 chunk를 인수로 받아 그대로 출력해봤습니다.

createReadStream 을 호출하여 스트림 객체를 만들기만 했는데 바로 파일을 읽기 시작한 것 같습니다. 왜냐하면 파일의 내용이 모두 출력되었기 때문이죠! 내부에서 data 이벤트가 일어났다는 뜻이고, 그 이벤트가 언제 어떻게 일어난 건진 모르겠지만요.

? createWriteStream 함수를 호출하면 WriteStream 객체가 나오는데요, 이 객체는 Writable 의 모든 인터페이스를 구현하므로, Writable 이라고도 말할 수 있습니다. (K5는 자동차의 모든 요건을 충족하므로 자동차라고도 이야기할 수 있죠) createReadStream 함수 호출로 인해 나오는 ReadStream 또한 Readable 의 모든 인터페이스를 구현하므로 Readable 입니다.

지금까지 요약

  • 스트림이란? 순차적인 데이터이다.
  • 스트림을 쓰는 이유는? 데이터를 차례대로 처리할 수 있기 때문에 아주 거대하거나 끝이 없는 데이터를 다루기 좋다.
  • 스트림에는 크게 읽기 전용 스트림(Readable), 쓰기 전용 스트림(Writable)이 있다.
  • 파일에 내용을 쓰는 스트림을 만드려면? createWriteStream 함수를 이용한다.
  • 스트림을 이용해 파일을 읽으려면? createReadStream 함수를 이용한다.
  • Writable 에는 write, end 등의 메소드가 있다.
  • Readable 는 on 을 이용해 이벤트 핸들러를 등록하여 데이터를 처리한다. (Writable 에도 물론 on이 있지만 이 글에서 추가로 다루지 않습니다.)

나만의 Readable 만들기

그냥 다음 코드를 따라 써보세요.


const { Readable } = require("stream");
class OneToNine extends Readable {
i = 1;
_read(size) {
if (this.i === 10) {
this.push(null);
return;
}
this.push(this.i.toString());
this.i += 1;
}
}
const oneToNine = new OneToNine();
oneToNine.pipe(process.stdout);

아래는 실행 결과입니다.


123456789

우왕~ 숫자가 나왔네요? 이 코드를 해석해봅시다.


class OneToNine extends Readable { ... }

일단 우리는 OneToNine 클래스를 정의했습니다. Readable 클래스를 확장을 했구요.



_read(size) {
if (this.i === 10) {
this.push(null);
return;
}
this.push(this.i.toString());
this.i += 1;
}

Readable로부터 확장할 수 있는 메소드는 여러가지 종류가 있는데, 일단 우리는 _read 하나만 재정의했습니다. _read 메소드는 우리가 만든 클래스가 실제로 동작할 때 적절한 시점에 적절하게 호출될 것입니다. 그냥 믿으세요. _read를 재정의할 때 유의해야 할 점만 잘 챙기면 됩니다.

_read 함수의 내부에서는 this.push 를 호출해야 합니다. push 메소드 또한 이미 Readable 에 정의되어 있습니다. 그 역할은 이 스트림을 읽으려고 하는 놈들에게 데이터를 넘기는 역할입니다.

_read 는 계속해서 호출될 수 있습니다. 스트림 자체가 끝이 없는 무한한 데이터이다 보니 기본적으로는 무제한으로 호출합니다. 이걸 멈추는 것이 바로 this.push(null) 입니다. 마치 반복문 내에서 break; 하는 것과 똑같은 역할을 한다고 보시면 됩니다. this.i는 처음에 1 이었는데, _read 가 진행됨에 따라서 2, 3, 4… 이렇게 1씩 계속 증가할 것입니다.



const oneToNine = new OneToNine();
oneToNine.pipe(process.stdout);

OneToNine 객체를 이용해서 새 oneToNine 객체를 만들었습니다. 여기에 pipe 메소드를 호출하여 process.stdout 이라는 Writable 과 연결해줍니다.

Readable 객체에는 pipe 함수가 있습니다. pipe의 뜻은 무엇이지요? 관이지요! 관을 만들어주는 겁니다. 누구에게요? 바로 Writable 객체로요! pipeReadable 객체와 Writable 객체를 연결하는 순간 뻥 뚫리면서 Readable 에서 읽을 수 있는 건 계속해서 읽어나가고, 읽은 내용들을 Writable 에 자동으로 write 해줍니다.


이를 순서로 따지면 다음과 같습니다. 기술적으로 정확한 건 아니지만, 이해를 돕기 위해 간단하게 설명합니다!

  1. Readable 클래스를 상속하는 새로운 클래스(OneToNine)를 만든다.
  2. 새로운 클래스(OneToNine)를 이용하여 새 객체를 생성한다.
  3. pipe 메소드를 호출한다.
  4. pipe 메소드의 인자로 들어간 Writable(process.stdout) 이 해당 Readable로부터 값을 읽어와야 한다고 인식한다.
  5. WritableReadableread를 호출한다.
  6. Readableread 는, 우리가 직접 만들었던 _read 함수를 호출한다. (언더바!!)
  7. _read 내부에서 push 함수가 호출된다.
  8. push의 인자로 전달된 데이터는 임시 버퍼에 쌓인다.
  9. Writable는 해당 임시 버퍼에서 데이터를 가져간다.

우리는 Readable 을 작성할 때, 데이터가 실제로 어떤 모습을 가지고 있냐 보다는 데이터를 어떻게 읽어들이냐에 집중합니다. 그래서 Readable 클래스를 상속하는 나만의 클래스를 작성하고, new OneToNine() 과 같이 Readable 에 대한 실제 객체를 만들었다 하더라도, 실제 유의미한 일은 일어나지 않습니다. 순차적으로 읽어오는 행위는 _read 메소드가 호출되었을 때 일어납니다. _read 메소드를 호출하는 주체는 같은 객체의 read 메소드 (언더바가 없음)이고, 이 메소드의 호출은 pipe로 연결한 외부 객체가 담당합니다.

사실은, <5~9>번에서 Writable 이 Readable의 read 를 호출한다느니, 임시 버퍼에서 데이터를 가져가느니, 했는데, 이는 사실 정확한 설명이 아닙니다. pipe는 한번에 많은 양을 읽지 않도록, 혹은 한번에 많이 쓰지 않도록 적절한 메커니즘을 내부적으로 구현하고 있습니다.

위 <7> 에서 this.push(null) 이 호출된다면 비로소 이 Readable 의 역할은 끝난다는 걸 알립니다. 그래서 끝이 날 수가 있습니다.

더 공부할 거리

이 글의 분량이 방대해지며 다른 내용들은 도저히 다룰 수가 없다는 걸 깨닫게 되었습니다. 여러가지 생각거리나 검색거리를 남겨봅니다.

아래는 읽을거리구요

마치며

스트림은 왜 이렇게 복잡한 방법으로 이용해야 하는 걸까요? 우리가 만든 클래스에서는 1부터 9까지 만들어내는 아주 간단한 스트림인데도 코드 량이 꽤 깁니다. 데이터를 한꺼번에 표현할 수 있다면 const list = [1, 2, 3, 4, 5, 6, 7, 8, 9] 혹은 Array.from({ length: 9 }).map((_, index) => index + 1) 이렇게 한 줄로도 표현할 수 있잖아요. list[2] 와 같은 식으로 2번 인덱스 데이터에 직접 접근해볼 수도 있지만 스트림은 불가능합니다. 배열을 순차적으로 사용하려면 forEach 등의 메소드를 사용하면 그만이지만, 우리는 Readable 클래스를 확장구현한 이상 Writable 클래스도 확장구현하여 pipe 로 연결해서 사용해야 합니다. 머리가 깨질것 같죠? (Writable 클래스를 확장구현하는 내용은 이 글에서 다루지 않습니다.)

이렇게 단점이 많은데도 불구하고, 무한한 데이터나 엄청나게 고르져스하게 큰 데이터들을 순차적으로 느긋하게 차례차례 확실하게 잘 처리하려면 스트림 말고는 답이 없습니다. 배열은 그러라고 만들어진 놈이 아니기도 하구요.

바꿔 말하자면, 끊기지 않는 무한한 데이터나 큰 데이터를 다룰 필요가 없다면 스트림은 사용할 필요가 없습니다. 해결하고자 하는 문제에 맞게 스트림으로 할 것인지, 단순 배열로 할 것인지는 선택의 문제입니다.

그렇다면 스트림에 대한 개념이 좀 생겼다 기대해보고, 이만 여기까지 하겠습니다.