forked from ami-GS/go-cdds
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreader.go
155 lines (139 loc) · 4.2 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package gocdds
/*
#cgo CFLAGS: -I/usr/include
#cgo LDFLAGS: -lddsc
#include "ddsc/dds.h"
*/
import "C"
import (
"time"
"unsafe"
)
type ReadCondition struct {
Entity
}
type Reader struct {
Entity
allocator *SampleAllocator
readConditions []ReadCondition
}
// take == false just return copy of data, take == true removes data after reading
// https://github.com/eclipse/cyclonedds/issues/17
func (r *Reader) Read(samples *unsafe.Pointer, info *SampleInfo, bufsz int, maxsz uint32, take bool) error {
var ret C.dds_entity_t
if take {
ret = C.dds_take(r.GetEntity(), samples, (*C.dds_sample_info_t)(info), C.size_t(bufsz), C.uint32_t(maxsz))
} else {
ret = C.dds_read(r.GetEntity(), samples, (*C.dds_sample_info_t)(info), C.size_t(bufsz), C.uint32_t(maxsz))
}
if ret < 0 {
return CddsErrorType(ret)
}
return nil
}
func (r *Reader) ReadWithCallback(bufsz int, maxsz uint32, take bool, finCh *chan error, callback func(unsafe.Pointer)) {
// WARN: currently this might have issue when participant.Delete()
// TODO: allock first, then use with loop
// TODO: need choise this to run forever
samples, num, err := r.BlockAllocRead(bufsz, maxsz, take)
if err != nil {
*finCh <- err
}
// TODO: change to foreach? but that of array might not work (just guessed)
for i := 0; i < num; i++ {
callback(samples.At(i))
}
r.allocator.Free(unsafe.Pointer(samples.head))
*finCh <- nil
}
func (r *Reader) BlockAllocRead(bufsz int, maxsz uint32, take bool) (*Array, int, error) {
// this is not GCed by Golang, maybe
samples := r.allocator.AllocArray(maxsz)
var ret C.dds_return_t
for i := 0; i < bufsz; {
loc := samples.At(i)
info := (*C.dds_sample_info_t)(samples.InfoAt(i))
if take {
ret = C.dds_take(r.GetEntity(), &loc, info, C.size_t(bufsz), C.uint32_t(maxsz))
} else {
ret = C.dds_read(r.GetEntity(), &loc, info, C.size_t(bufsz), C.uint32_t(maxsz))
}
if ret < 0 {
return nil, 0, CddsErrorType(ret)
}
for j := 0; j < int(ret); j++ {
info := (*C.dds_sample_info_t)(samples.InfoAt(i + j))
if info.valid_data {
i++
}
}
time.Sleep(time.Millisecond * 20)
}
return samples, bufsz, nil
}
func (r *Reader) AllocRead(bufsz int, maxsz uint32, take bool) (*Array, int, error) {
// this is not Garbage Collected by Golang, maybe
samples := r.allocator.AllocArray(maxsz)
loc := samples.At(0)
var ret C.dds_entity_t
if take {
ret = C.dds_take(r.GetEntity(), &loc, (*C.dds_sample_info_t)(samples.InfoAt(0)), C.size_t(bufsz), C.uint32_t(maxsz))
} else {
ret = C.dds_read(r.GetEntity(), &loc, (*C.dds_sample_info_t)(samples.InfoAt(0)), C.size_t(bufsz), C.uint32_t(maxsz))
}
if ret < 0 {
return nil, 0, CddsErrorType(ret)
}
return samples, int(ret), nil
}
func (r *Reader) Alloc(bufsz int) *Array {
// this is not Garbage Collected by Golang, maybe
return r.allocator.AllocArray(uint32(bufsz))
}
func (r *Reader) ReadWithBuff(samples *Array, take bool) (int, error) {
// this is not Garbage Collected by Golang, maybe
if samples == nil {
panic("buffer was not allocated")
}
loc := samples.At(0)
var ret C.dds_entity_t
if take {
ret = C.dds_take(r.GetEntity(), &loc, (*C.dds_sample_info_t)(samples.InfoAt(0)), C.size_t(samples.elmSize), C.uint32_t(samples.elmSize))
} else {
ret = C.dds_read(r.GetEntity(), &loc, (*C.dds_sample_info_t)(samples.InfoAt(0)), C.size_t(samples.elmSize), C.uint32_t(samples.elmSize))
}
if ret < 0 {
return 0, CddsErrorType(ret)
}
return int(ret), nil
}
func (r *Reader) CreateReadCondition(mask ReadConditionState) *ReadCondition {
rd := ReadCondition{
Entity: Entity{ent: C.dds_create_readcondition(r.GetEntity(), C.uint32_t(mask)), qos: nil},
}
r.readConditions = append(r.readConditions, rd)
return &rd
}
func (r *Reader) Free(sampleHead unsafe.Pointer) {
if r.allocator != nil {
r.allocator.Free(sampleHead)
}
}
func (r *Reader) delete() error {
if r.allocator != nil {
r.allocator.AllFree()
}
if r.qos != nil {
r.qos.delete()
}
for _, rdcond := range r.readConditions {
// TODO: be careful, this might be deleted via participant.Delete(), need to check in the future
err := rdcond.delete()
if err != nil {
return err
}
}
return nil
// reader entity will be deleted by participant, no need to call from here
//r.Entity.Delete()
}