Node.js FileStreamの読み込みを一時停止しつつ、一括登録を実行する
郵便番号データをTedious BulkLoadで一括登録してみました。Node.js Tedious でBulkLoadを使用して郵便番号データを一括登録
このときは郵便番号約12万件をすべて読みこんで一括登録しましたが、
登録するデータ件数が増えた場合を考慮し、1万件毎にBulkLoadするよう修正してみます。
最初のサンプル
1万件読み込んだらBulkLoadを実行すればよいだろうと修正したソースがこちら。
- const fs = require('fs');
- const readline = require('readline')
- const { Connection, TYPES } = require('tedious')
- // 設定に従いデータベースへ接続
- function create_connection(config) {
- const connection = new Connection(config)
- // Promiseをnewした時点で引数のfunctionが実行される
- const p = new Promise(function(resolve, reject) {
- connection.on('connect', err => {
- if (err) {
- reject(err)
- } else {
- resolve(connection)
- }
- });
- connection.connect()
- });
- return p
- }
- // BulkLoadの実行
- function execBulkLoad(connection, rows) {
- const p = new Promise(function(resolve, reject) {
- // BulkLoad用の設定
- const options = {}
- const bulkLoad = connection.newBulkLoad('postal_code', options, function (error, rowCount) {
- resolve('inserted ' + rowCount + ' rows');
- })
- bulkLoad.addColumn('code', TYPES.Char, { nullable: false })
- bulkLoad.addColumn('address', TYPES.NVarChar, { length: 100, nullable: false })
- rows.forEach((row) => {
- // { code: '郵便番号', address: '住所'} の形式のデータをaddRow
- bulkLoad.addRow(row)
- })
- // バルクロード実行
- connection.execBulkLoad(bulkLoad)
- });
- return p
- }
- async function main() {
- // データベースに接続
- const config = {
- authentication: {
- options: {
- userName: 'sa',
- password: 'P@ssw0rd'
- },
- type: 'default'
- },
- server: 'localhost',
- options: {
- database: 'sample',
- encrypt: false
- }
- }
- const connection = await create_connection(config);
- const stream = fs.createReadStream('./KEN_ALL_UTF8.CSV', 'utf8')
- const reader = readline.createInterface({ input: stream })
- let rows = []
- let rowCount = 0
- reader.on('line', async (data) => {
- // 郵便番号情報を取得
- const item = data.split(',').map((value) => { return value.replace(/^"+|"+$/g,'') })
- const code = item[2]
- const address = item[6] + item[7] + item[8]
- rows.push({code: code, address: address})
- if (rows.length == 10000) {
- const msg = await execBulkLoad(connection, rows)
- console.log(msg)
- rows = []
- }
- rowCount++
- })
- reader.on('close', async () => {
- // 登録実行
- if (rows.length) {
- const msg = await execBulkLoad(connection, rows)
- console.log(msg)
- }
- console.log(rowCount)
- connection.close()
- })
- }
- main()
実行すると、半分程度しか登録されません。
$ node app.js
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 876 rows
124517
読み込んだ件数12万件に対し、インサートログは60,874件。
実際にデータベースへ登録されていたのは、60,876件でした。
なんでだ?
pause / resume
BulkLoadを実行中もファイル読み込みが実行されるのでデータ件数が合わないのでは?
ドキュメントを見てみると、streamにはpauseとresumeというメソッドがあります。
https://nodejs.org/api/stream.html#stream_readable_pause
BulkLoad前にpauseを呼び出し。
終わったらresumeを呼び出して処理再開としてみます。
修正箇所の抜粋です。
- reader.on('line', async (data) => {
- // 郵便番号情報を取得
- const item = data.split(',').map((value) => { return value.replace(/^"+|"+$/g,'') })
- const code = item[2]
- const address = item[6] + item[7] + item[8]
- rows.push({code: code, address: address})
- if (rows.length == 10000) {
- reader.pause() // 読み込みを一旦停止
- const msg = await execBulkLoad(connection, rows)
- console.log(msg)
- rows = []
- reader.resume() // 読み込み再開
- }
- rowCount++
- })
$ node app.js
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 2091 rows
124517
読み込んだレコード数と登録件数がかなり近づきましたが、まだ漏れがあります。
await pause
pauseが実行されるまでに読み込んでしまうデータがあるのでは?と思い、pauseにawaitをつけてみました。
- reader.on('line', async (data) => {
- // 郵便番号情報を取得
- const item = data.split(',').map((value) => { return value.replace(/^"+|"+$/g,'') })
- const code = item[2]
- const address = item[6] + item[7] + item[8]
- rows.push({code: code, address: address})
- if (rows.length == 10000) {
- await reader.pause() // 読み込みを一旦停止
- const msg = await execBulkLoad(connection, rows)
- console.log(msg)
- rows = []
- reader.resume() // 読み込み再開
- }
- rowCount++
- })
$ node app.js
inserted 10376 rows
inserted 10311 rows
inserted 10123 rows
inserted 10291 rows
inserted 10219 rows
inserted 10029 rows
inserted 10024 rows
inserted 10316 rows
inserted 10190 rows
inserted 10161 rows
inserted 10221 rows
inserted 10165 rows
inserted 2091 rows
124517
これで読み込んだ件数と登録件数が一致してくれました。
・・・しかし、指定した件数でのデータ登録とはならず、分割境界値付近での挙動が気になります。
エディタにもawaitの意味がないという警告が表示されますし。
pause event
pauseしたときに発生するイベント内でデータ登録を行うよう修正しました。
- const fs = require('fs');
- const readline = require('readline')
- const { Connection, TYPES } = require('tedious')
- // 設定に従いデータベースへ接続
- function create_connection(config) {
- const connection = new Connection(config)
- // Promiseをnewした時点で引数のfunctionが実行される
- const p = new Promise(function(resolve, reject) {
- connection.on('connect', err => {
- if (err) {
- reject(err)
- } else {
- resolve(connection)
- }
- });
- connection.connect()
- });
- return p
- }
- // BulkLoadの実行
- function execBulkLoad(connection, rows) {
- const p = new Promise(function(resolve, reject) {
- // BulkLoad用の設定
- const options = {}
- const bulkLoad = connection.newBulkLoad('postal_code', options, function (error, rowCount) {
- resolve('inserted ' + rowCount + ' rows (input:' + rows.length+')');
- })
- bulkLoad.addColumn('code', TYPES.Char, { nullable: false })
- bulkLoad.addColumn('address', TYPES.NVarChar, { length: 100, nullable: false })
- rows.forEach((row) => {
- // { code: '郵便番号', address: '住所'} の形式のデータをaddRow
- bulkLoad.addRow(row)
- })
- // バルクロード実行
- connection.execBulkLoad(bulkLoad)
- });
- return p
- }
- async function main() {
- // データベースに接続
- const config = {
- authentication: {
- options: {
- userName: 'sa',
- password: 'P@ssw0rd'
- },
- type: 'default'
- },
- server: 'localhost',
- options: {
- database: 'sample',
- encrypt: false
- }
- }
- const connection = await create_connection(config);
- const stream = fs.createReadStream('./KEN_ALL_UTF8.CSV', 'utf8')
- const reader = readline.createInterface({ input: stream })
- let rows = []
- let bulkRows = []
- let rowCount = 0
- reader.on('line', (data) => {
- // 郵便番号情報を取得
- const item = data.split(',').map((value) => { return value.replace(/^"+|"+$/g,'') })
- const code = item[2]
- const address = item[6] + item[7] + item[8]
- rows.push({code: code, address: address})
- if (rows.length == 10000) {
- bulkRows.push(rows)
- rows = []
- reader.pause() // 読み込みを一旦停止
- }
- rowCount++
- })
- reader.on('pause', async () => {
- // reader.pauseに加え、closeイベントの前にも呼び出される
- // 登録内容が存在しない場合はスキップ
- if (bulkRows.length == 0) {
- return
- }
- const msg = await execBulkLoad(connection, bulkRows.pop())
- console.log(msg)
- reader.resume() // 読み込み再開
- })
- reader.on('close', async () => {
- // 登録実行
- if (rows.length > 0) {
- const msg = await execBulkLoad(connection, rows)
- console.log(msg)
- }
- console.log(rowCount)
- connection.close()
- })
- }
- main()
狙い通りの実行結果です。
$ node app.js
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 4517 rows (input:4517)
124517