Skip to content

Commit 8eff579

Browse files
committed
stream
1 parent 5eea5a9 commit 8eff579

File tree

2 files changed

+470
-0
lines changed

2 files changed

+470
-0
lines changed

stream/stream.go

+255
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
package stream
2+
3+
import (
4+
"bufio"
5+
"fmt"
6+
"io"
7+
"strings"
8+
"sync"
9+
10+
"github.com/mstgnz/sqlmapper"
11+
)
12+
13+
// StreamParser represents an interface for streaming database dump operations
14+
type StreamParser interface {
15+
// ParseStream parses a SQL dump from a reader and calls the callback for each parsed object
16+
ParseStream(reader io.Reader, callback func(SchemaObject) error) error
17+
18+
// ParseStreamParallel parses a SQL dump from a reader in parallel using worker pools
19+
ParseStreamParallel(reader io.Reader, callback func(SchemaObject) error, workers int) error
20+
21+
// GenerateStream generates SQL statements for schema objects and writes them to the writer
22+
GenerateStream(schema *sqlmapper.Schema, writer io.Writer) error
23+
}
24+
25+
// WorkerPool represents a pool of workers for parallel processing
26+
type WorkerPool struct {
27+
workers int
28+
jobs chan string
29+
results chan SchemaObject
30+
errors chan error
31+
wg sync.WaitGroup
32+
parser StreamParser
33+
}
34+
35+
// NewWorkerPool creates a new worker pool with the specified number of workers
36+
func NewWorkerPool(workers int, parser StreamParser) *WorkerPool {
37+
return &WorkerPool{
38+
workers: workers,
39+
jobs: make(chan string, workers),
40+
results: make(chan SchemaObject, workers),
41+
errors: make(chan error, workers),
42+
parser: parser,
43+
}
44+
}
45+
46+
// Start starts the worker pool
47+
func (wp *WorkerPool) Start() {
48+
for i := 0; i < wp.workers; i++ {
49+
wp.wg.Add(1)
50+
go wp.worker()
51+
}
52+
}
53+
54+
// worker processes jobs from the jobs channel
55+
func (wp *WorkerPool) worker() {
56+
defer wp.wg.Done()
57+
for statement := range wp.jobs {
58+
// Parse the SQL statement using a temporary reader
59+
err := wp.parser.ParseStream(strings.NewReader(statement), func(obj SchemaObject) error {
60+
wp.results <- obj
61+
return nil
62+
})
63+
64+
if err != nil {
65+
wp.errors <- fmt.Errorf("error processing statement: %v", err)
66+
return
67+
}
68+
}
69+
}
70+
71+
// Submit submits a new SQL statement to be processed
72+
func (wp *WorkerPool) Submit(statement string) {
73+
wp.jobs <- statement
74+
}
75+
76+
// Results returns the channel for receiving processed schema objects
77+
func (wp *WorkerPool) Results() <-chan SchemaObject {
78+
return wp.results
79+
}
80+
81+
// Errors returns the channel for receiving processing errors
82+
func (wp *WorkerPool) Errors() <-chan error {
83+
return wp.errors
84+
}
85+
86+
// Wait waits for all workers to finish processing
87+
func (wp *WorkerPool) Wait() {
88+
close(wp.jobs)
89+
wp.wg.Wait()
90+
close(wp.results)
91+
close(wp.errors)
92+
}
93+
94+
// Process processes a stream of SQL statements in parallel
95+
func (wp *WorkerPool) Process(reader io.Reader, callback func(SchemaObject) error) error {
96+
// Start the worker pool
97+
wp.Start()
98+
99+
// Start a goroutine to read statements and submit them to workers
100+
go func() {
101+
streamReader := NewStreamReader(reader, ";")
102+
for {
103+
statement, err := streamReader.ReadStatement()
104+
if err == io.EOF {
105+
break
106+
}
107+
if err != nil {
108+
wp.errors <- fmt.Errorf("error reading statement: %v", err)
109+
break
110+
}
111+
112+
statement = strings.TrimSpace(statement)
113+
if statement == "" {
114+
continue
115+
}
116+
wp.Submit(statement)
117+
}
118+
wp.Wait()
119+
}()
120+
121+
// Process results and handle errors
122+
for {
123+
select {
124+
case obj, ok := <-wp.Results():
125+
if !ok {
126+
return nil
127+
}
128+
if err := callback(obj); err != nil {
129+
return err
130+
}
131+
case err := <-wp.Errors():
132+
return err
133+
}
134+
}
135+
}
136+
137+
// SchemaObjectType represents the type of schema object
138+
type SchemaObjectType int
139+
140+
const (
141+
TableObject SchemaObjectType = iota
142+
ViewObject
143+
FunctionObject
144+
ProcedureObject
145+
TriggerObject
146+
IndexObject
147+
ConstraintObject
148+
SequenceObject
149+
TypeObject
150+
PermissionObject
151+
)
152+
153+
// SchemaObject represents a parsed database object
154+
type SchemaObject struct {
155+
Type SchemaObjectType
156+
Data interface{} // Table, View, Function, etc.
157+
}
158+
159+
// StreamReader provides buffered reading of SQL statements
160+
type StreamReader struct {
161+
reader *bufio.Reader
162+
delimiter string
163+
buffer []byte
164+
}
165+
166+
// NewStreamReader creates a new StreamReader with the given reader and delimiter
167+
func NewStreamReader(reader io.Reader, delimiter string) *StreamReader {
168+
return &StreamReader{
169+
reader: bufio.NewReader(reader),
170+
delimiter: delimiter,
171+
buffer: make([]byte, 0, 4096),
172+
}
173+
}
174+
175+
// ReadStatement reads the next SQL statement from the reader
176+
func (sr *StreamReader) ReadStatement() (string, error) {
177+
var statement []byte
178+
inString := false
179+
inComment := false
180+
lineComment := false
181+
escaped := false
182+
183+
for {
184+
b, err := sr.reader.ReadByte()
185+
if err != nil {
186+
if err == io.EOF && len(statement) > 0 {
187+
return string(statement), nil
188+
}
189+
return "", err
190+
}
191+
192+
// Handle string literals
193+
if b == '\'' && !inComment && !escaped {
194+
inString = !inString
195+
}
196+
197+
// Handle escape characters
198+
if b == '\\' && !inComment {
199+
escaped = !escaped
200+
} else {
201+
escaped = false
202+
}
203+
204+
// Handle comments
205+
if !inString && !inComment && b == '-' {
206+
nextByte, err := sr.reader.ReadByte()
207+
if err == nil && nextByte == '-' {
208+
lineComment = true
209+
inComment = true
210+
continue
211+
}
212+
sr.reader.UnreadByte()
213+
}
214+
215+
if !inString && !inComment && b == '/' {
216+
nextByte, err := sr.reader.ReadByte()
217+
if err == nil && nextByte == '*' {
218+
inComment = true
219+
continue
220+
}
221+
sr.reader.UnreadByte()
222+
}
223+
224+
if inComment && !lineComment && b == '*' {
225+
nextByte, err := sr.reader.ReadByte()
226+
if err == nil && nextByte == '/' {
227+
inComment = false
228+
continue
229+
}
230+
sr.reader.UnreadByte()
231+
}
232+
233+
if lineComment && b == '\n' {
234+
inComment = false
235+
lineComment = false
236+
continue
237+
}
238+
239+
// Skip comments
240+
if inComment {
241+
continue
242+
}
243+
244+
// Add character to statement
245+
statement = append(statement, b)
246+
247+
// Check for delimiter
248+
if !inString && len(statement) >= len(sr.delimiter) {
249+
lastIdx := len(statement) - len(sr.delimiter)
250+
if string(statement[lastIdx:]) == sr.delimiter {
251+
return string(statement[:lastIdx]), nil
252+
}
253+
}
254+
}
255+
}

0 commit comments

Comments
 (0)