Is Firehose unstable? #2808
-
I use Firehose( https://docs.bsky.app/docs/advanced-guides/firehose ) to run trend analysis bot: https://bsky.app/profile/lamrongol.bsky.social/lists/3kob6kalezl2a Following my program log, which shows posts per minutes: I use Firehose by running javascript code minimal javascript code is: const ws = new WebSocket('wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos')
ws.addEventListener('message', async (event) => {
const messageBuf = await event.data
const [header, body] = decodeMultiple(new Uint8Array(messageBuf))
if (header.op !== MSG_OP) return
// main code
} |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments
-
This was very simple problem, I started to use Firehose when document is WIP and didn't use |
Beta Was this translation helpful? Give feedback.
-
Sorry connection dropped just after previous post. // import { CID } from 'https://cdn.jsdelivr.net/npm/[email protected]/+esm'
// import { CarReader } from 'https://cdn.jsdelivr.net/npm/@ipld/[email protected]/+esm'
// import { decode, decodeMultiple, addExtension } from 'https://cdn.jsdelivr.net/npm/[email protected]/+esm'
import { CID } from 'multiformats/cid'
import { CarReader } from '@ipld/car'
import { decode, decodeMultiple, addExtension } from 'cbor-x'
const MSG_OP = 1
const POST_TYPE = 'app.bsky.feed.post'
const LIKE_TYPE = 'app.bsky.feed.like'
const REPOST_TYPE = 'app.bsky.feed.repost'
const POST_STR_LENGTH = POST_TYPE.length + 1;
const encoder = new TextEncoder();
const date = new Date();
const dateStr = date.getFullYear().toString() + (date.getMonth() + 1).toString().padStart(2, '0') + date.getDate().toString().padStart(2, '0');
function main() {
let count = 0
setInterval(() => {
const today = Date(Date.now());
console.log(today);
console.log(count);
count = 0;
}, 60 * 1000)
const ws = new WebSocket('wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos')
ws.addEventListener('message', async (event) => {
const messageBuf = await event.data
const [header, body] = decodeMultiple(new Uint8Array(messageBuf))
if (header.op !== MSG_OP) return
//ダメ//if (count%100==0) console.log("header:\n" + JSON.stringify(header))
let car
if (!body.blocks) return;
try {
// console.log(typeof(body.blocks))
car = await CarReader.fromBytes(body.blocks)
} catch (e) {
console.error(e)
return
}
for (const op of body.ops) {
if (!op.cid) continue
const block = await car.get(op.cid)
const record = decode(block.bytes)
delete body.blocks;
// console.log(body)
// console.log(record)
if (record.$type === POST_TYPE && typeof record.text === 'string' && body.repo/* did */) {
//const rkey = op.path.split('/').at(-1)
//console.log(record.text.replace((/[\n\r\t]+/g, '')))
count++;
}
}
}
)
}
const file = Bun.file("./Data/rawData" + dateStr + ".json");
const writer = file.writer();
function writeRawData(record) {
const data = encoder.encode(JSON.stringify(record));
writer.write(data);
}
addExtension({
Class: CID,
tag: 42,
encode: () => {
throw new Error('cannot encode cids')
},
decode: (bytes) => {
if (bytes[0] !== 0) {
throw new Error('invalid cid for cbor tag 42')
}
return CID.decode(bytes.subarray(1)) // ignore leading 0x00
},
})
main() |
Beta Was this translation helpful? Give feedback.
-
I found this is just a problem of Java side(processing text is too slow). Sorry. |
Beta Was this translation helpful? Give feedback.
I found this is just a problem of Java side(processing text is too slow). Sorry.