Skip to content

Commit c05917d

Browse files
committed
stream
1 parent 29d6b0f commit c05917d

File tree

10 files changed

+1969
-0
lines changed

10 files changed

+1969
-0
lines changed

mysql/mysql_stream.go

+119
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"io"
66
"strings"
7+
"sync"
78

89
"github.com/mstgnz/sqlmapper"
910
)
@@ -127,6 +128,124 @@ func (p *MySQLStreamParser) ParseStream(reader io.Reader, callback func(sqlmappe
127128
return nil
128129
}
129130

131+
// ParseStreamParallel implements parallel processing for MySQL stream parsing
132+
func (p *MySQLStreamParser) ParseStreamParallel(reader io.Reader, callback func(sqlmapper.SchemaObject) error, workers int) error {
133+
streamReader := sqlmapper.NewStreamReader(reader, ";")
134+
statements := make(chan string, workers)
135+
results := make(chan sqlmapper.SchemaObject, workers)
136+
errors := make(chan error, workers)
137+
var wg sync.WaitGroup
138+
139+
// Start worker goroutines
140+
for i := 0; i < workers; i++ {
141+
wg.Add(1)
142+
go func() {
143+
defer wg.Done()
144+
for statement := range statements {
145+
obj, err := p.parseStatement(statement)
146+
if err != nil {
147+
errors <- err
148+
return
149+
}
150+
if obj != nil {
151+
results <- *obj
152+
}
153+
}
154+
}()
155+
}
156+
157+
// Start a goroutine to close results channel after all workers are done
158+
go func() {
159+
wg.Wait()
160+
close(results)
161+
}()
162+
163+
// Start a goroutine to read statements and send them to workers
164+
go func() {
165+
for {
166+
statement, err := streamReader.ReadStatement()
167+
if err == io.EOF {
168+
break
169+
}
170+
if err != nil {
171+
errors <- fmt.Errorf("error reading statement: %v", err)
172+
break
173+
}
174+
175+
statement = strings.TrimSpace(statement)
176+
if statement == "" {
177+
continue
178+
}
179+
statements <- statement
180+
}
181+
close(statements)
182+
}()
183+
184+
// Process results and handle errors
185+
for obj := range results {
186+
if err := callback(obj); err != nil {
187+
return err
188+
}
189+
}
190+
191+
// Check for any errors from workers
192+
select {
193+
case err := <-errors:
194+
return err
195+
default:
196+
return nil
197+
}
198+
}
199+
200+
// parseStatement parses a single SQL statement and returns a SchemaObject
201+
func (p *MySQLStreamParser) parseStatement(statement string) (*sqlmapper.SchemaObject, error) {
202+
upperStatement := strings.ToUpper(statement)
203+
204+
switch {
205+
case strings.HasPrefix(upperStatement, "CREATE TABLE"):
206+
table, err := p.parseTableStatement(statement)
207+
if err != nil {
208+
return nil, err
209+
}
210+
return &sqlmapper.SchemaObject{
211+
Type: sqlmapper.TableObject,
212+
Data: table,
213+
}, nil
214+
215+
case strings.HasPrefix(upperStatement, "CREATE VIEW"):
216+
view, err := p.parseViewStatement(statement)
217+
if err != nil {
218+
return nil, err
219+
}
220+
return &sqlmapper.SchemaObject{
221+
Type: sqlmapper.ViewObject,
222+
Data: view,
223+
}, nil
224+
225+
case strings.HasPrefix(upperStatement, "CREATE FUNCTION"):
226+
function, err := p.parseFunctionStatement(statement)
227+
if err != nil {
228+
return nil, err
229+
}
230+
return &sqlmapper.SchemaObject{
231+
Type: sqlmapper.FunctionObject,
232+
Data: function,
233+
}, nil
234+
235+
case strings.HasPrefix(upperStatement, "CREATE PROCEDURE"):
236+
procedure, err := p.parseProcedureStatement(statement)
237+
if err != nil {
238+
return nil, err
239+
}
240+
return &sqlmapper.SchemaObject{
241+
Type: sqlmapper.ProcedureObject,
242+
Data: procedure,
243+
}, nil
244+
}
245+
246+
return nil, nil
247+
}
248+
130249
// GenerateStream implements the StreamParser interface
131250
func (p *MySQLStreamParser) GenerateStream(schema *sqlmapper.Schema, writer io.Writer) error {
132251
// Generate tables

oracle/oracle.go

+35
Original file line numberDiff line numberDiff line change
@@ -556,3 +556,38 @@ func (o *Oracle) Generate(schema *sqlmapper.Schema) (string, error) {
556556

557557
return result.String(), nil
558558
}
559+
560+
func (o *Oracle) parseTables(statement string) error {
561+
// TODO: Implement table parsing
562+
return nil
563+
}
564+
565+
func (o *Oracle) parseViews(statement string) error {
566+
// TODO: Implement view parsing
567+
return nil
568+
}
569+
570+
func (o *Oracle) parseFunctions(statement string) error {
571+
// TODO: Implement function parsing
572+
return nil
573+
}
574+
575+
func (o *Oracle) parseTriggers(statement string) error {
576+
// TODO: Implement trigger parsing
577+
return nil
578+
}
579+
580+
func (o *Oracle) parseSequences(statement string) error {
581+
// TODO: Implement sequence parsing
582+
return nil
583+
}
584+
585+
func (o *Oracle) parseTypes(statement string) error {
586+
// TODO: Implement type parsing
587+
return nil
588+
}
589+
590+
func (o *Oracle) parseIndexes(statement string) error {
591+
// TODO: Implement index parsing
592+
return nil
593+
}

0 commit comments

Comments
 (0)