-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer_server.js
142 lines (129 loc) · 4.07 KB
/
consumer_server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
const fs = require('fs');
var async = require("async");
const express = require('express');
const readline = require('readline');
var ffmpegs = require('fluent-ffmpeg');
const amqp = require('amqplib/callback_api');
var app = express();
const textVar='amirsorouri00'
const fileDirs = '/home/amirsorouri/Desktop/stream/amirh/semi'; // Directory that input files are stored
const fileTmp = '/home/amirsorouri/Desktop/stream/amirh/semi/tmp'; // Directory that input files are stored
const movieLocs = '/home/amirsorouri/Desktop/stream/amirh/semi/f2.mp4'; // Directory that input files are stored
const readInterface = readline.createInterface({
input: fs.createReadStream(fileDirs+'/keyFrameTimeList.txt'),
console: true
});
var mapStart = new Map();
var mapEnd = new Map();
let ii = 0;
readInterface.on('line', function (line) {
mapStart.set("stream".concat(ii).concat(".ts"), line)
if(ii==0)
console.log(mapStart)
if (ii > 0)
mapEnd.set("stream".concat(ii - 1).concat(".ts"), line)
ii++
});
function watermarker(proc) {
return new Promise(resolve => {
setTimeout(() => {
resolve(proc.run());
}, 2000);
});
}
async function try_to_watermark(proc){
await watermarker(proc);
}
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var queue = 'node_queue';
channel.assertQueue(queue, {
durable: true
});
channel.prefetch(1);
console.log("Waiting for messages in %s", queue);
channel.consume(queue, function(msg) {
console.log("Received '%s'", msg.content.toString());
var head = Number(msg.content.toString());
// console.log(head)
var tmp = head + (head - 1)*2;
console.log("Start From ", tmp)
for (var i = 0; i < 3; i++, tmp++) {
final = 'stream'.concat(tmp).concat('.ts');
// console.log(final)
if (fs.existsSync(fileDirs + '/' + final)) {
console.log('*** Another process doing ', final + " ***");
continue;
}
if(mapStart.get(final) && mapEnd.get(final)) {
var proc = convert_segment_vod(mapStart.get(final), mapEnd.get(final), movieLocs, final, textVar);
try_to_watermark(proc).then(() => {
console.log("Inside try_to_watermark ", final);
// console.log("SUBPROCESS ", sgStartNo, ": ",final, " now exist.");
})
}
else {
console.log(final, " is out of range.")
break;
}
}
// console.log(head);
setTimeout(function() {
channel.ack(msg);
}, 50);
});
});
});
function convert_segment_vod(mapStart, mapEnd, movieLocs, final, textVar){
var proc = ffmpegs(movieLocs)
.videoFilters({
filter: 'drawtext',
options: {
text: textVar,
fontsize: 36,
fontcolor: 'white',
x: '(main_w/2-text_w/2)',
y: '(text_h/2)+15',
shadowcolor: 'black',
shadowx: 2,
shadowy: 2
}
})
.videoCodec('libx264')
.inputOption([
'-ss '.concat(mapStart),
'-itsoffset '.concat(mapStart)
])
.addOptions([
'-acodec copy'
])
.format('mpegts')
.addOptions([
'-t '.concat(mapEnd)
])
.on('end', function (stdout, stderr) {
console.log('SUBPROCESS: Transcoding succeeded !', movieLocs, final);
fs.rename(fileTmp + '/'+ final, fileDirs + '/' + final, function (err) {
if (err) {
if (err.code === 'EXDEV') {
console.log("Error 1")
} else {
console.log("Error 2")
}
return;
}
});
})
.on('error', function (err) {
console.log('SUBPROCESS: an error happened: ' + err.message);
})
.output(fileTmp+'/'+final)
// .pipe(res)
return proc;
}