recordreader.cpp
1 /***************************************************************************
2  Copyright (C) 2002-2015 Kentaro Kitagawa
3  kitagawa@phys.s.u-tokyo.ac.jp
4 
5  This program is free software; you can redistribute it and/or
6  modify it under the terms of the GNU Library General Public
7  License as published by the Free Software Foundation; either
8  version 2 of the License, or (at your option) any later version.
9 
10  You should have received a copy of the GNU Library General
11  Public License and a list of authors along with this program;
12  see the files COPYING and AUTHORS.
13 ***************************************************************************/
14 #include "recordreader.h"
15 #include "analyzer.h"
16 #include "primarydriver.h"
17 #include "xtime.h"
18 #include "measure.h"
19 
20 #include <zlib.h>
21 #include <vector>
22 
23 #define IFSMODE std::ios::in
24 #define SPEED_FASTEST "Fastest"
25 #define SPEED_FAST "Fast"
26 #define SPEED_NORMAL "Normal"
27 #define SPEED_SLOW "Slow"
28 
29 #define RECORDREADER_DELAY 20
30 #define RECORD_READER_NUM_THREADS 1
31 
32 XRawStreamRecordReader::XIOError::XIOError(const char *file, int line)
33  : XRecordError(i18n("IO Error"), file, line) {}
34 XRawStreamRecordReader::XIOError::XIOError(const XString &msg, const char *file, int line)
35  : XRecordError(msg, file, line) {}
36 XRawStreamRecordReader::XBufferOverflowError::XBufferOverflowError(const char *file, int line)
37  : XIOError(i18n("Buffer Overflow Error"), file, line) {}
38 XRawStreamRecordReader::XBrokenRecordError::XBrokenRecordError(const char *file, int line)
39  : XRecordError(i18n("Broken Record Error"), file, line) {}
40 XRawStreamRecordReader::XNoDriverError::
41 XNoDriverError(const XString &driver_name, const char *file, int line)
42  : XRecordError(i18n("No Driver Error: ") + driver_name, file, line),
43  name(driver_name) {}
44 
45 XRawStreamRecordReader::XRawStreamRecordReader(const char *name, bool runtime, const shared_ptr<XDriverList> &driverlist)
46  : XRawStream(name, runtime, driverlist),
47  m_speed(create<XComboNode>("Speed", true, true)),
48  m_fastForward(create<XBoolNode>("FastForward", true)),
49  m_rewind(create<XBoolNode>("Rewind", true)),
50  m_stop(create<XTouchableNode>("Stop", true)),
51  m_first(create<XTouchableNode>("First", true)),
52  m_next(create<XTouchableNode>("Next", true)),
53  m_back(create<XTouchableNode>("Back", true)),
54  m_posString(create<XStringNode>("PosString", true)),
55  m_periodicTerm(0) {
56 
57  iterate_commit([=](Transaction &tr){
58  tr[ *m_speed].add(SPEED_FASTEST);
59  tr[ *m_speed].add(SPEED_FAST);
60  tr[ *m_speed].add(SPEED_NORMAL);
61  tr[ *m_speed].add(SPEED_SLOW);
62  tr[ *m_speed] = SPEED_FAST;
63 
64  m_lsnOnOpen = tr[ *filename()].onValueChanged().connectWeakly(
65  shared_from_this(), &XRawStreamRecordReader::onOpen);
66  m_lsnFirst = tr[ *m_first].onTouch().connectWeakly(
67  shared_from_this(), &XRawStreamRecordReader::onFirst,
68  XListener::FLAG_MAIN_THREAD_CALL | XListener::FLAG_AVOID_DUP | XListener::FLAG_DELAY_ADAPTIVE);
69  m_lsnBack = tr[ *m_back].onTouch().connectWeakly(
70  shared_from_this(), &XRawStreamRecordReader::onBack,
71  XListener::FLAG_MAIN_THREAD_CALL | XListener::FLAG_AVOID_DUP | XListener::FLAG_DELAY_ADAPTIVE);
72  m_lsnNext = tr[ *m_next].onTouch().connectWeakly(
73  shared_from_this(), &XRawStreamRecordReader::onNext,
74  XListener::FLAG_MAIN_THREAD_CALL | XListener::FLAG_AVOID_DUP | XListener::FLAG_DELAY_ADAPTIVE);
75  m_lsnStop = tr[ *m_stop].onTouch().connectWeakly(
76  shared_from_this(), &XRawStreamRecordReader::onStop,
77  XListener::FLAG_MAIN_THREAD_CALL | XListener::FLAG_AVOID_DUP | XListener::FLAG_DELAY_ADAPTIVE);
78  m_lsnPlayCond = tr[ *m_fastForward].onValueChanged().connectWeakly(
79  shared_from_this(),
80  &XRawStreamRecordReader::onPlayCondChanged,
81  XListener::FLAG_MAIN_THREAD_CALL | XListener::FLAG_AVOID_DUP | XListener::FLAG_DELAY_ADAPTIVE);
82  tr[ *m_rewind].onValueChanged().connect(m_lsnPlayCond);
83  tr[ *m_speed].onValueChanged().connect(m_lsnPlayCond);
84  });
85 
86  m_threads.resize(RECORD_READER_NUM_THREADS);
87  for(auto it = m_threads.begin(); it != m_threads.end(); it++) {
88  it->reset(new XThread<XRawStreamRecordReader>(shared_from_this(),
89  &XRawStreamRecordReader::execute));
90  ( *it)->resume();
91  }
92 }
93 void
94 XRawStreamRecordReader::onOpen(const Snapshot &shot, XValueNodeBase *) {
95  if(m_pGFD) gzclose(static_cast<gzFile>(m_pGFD));
96  m_pGFD = gzopen(QString(( **filename())->to_str()).toLocal8Bit().data(), "rb");
97 }
98 void
99 XRawStreamRecordReader::readHeader(void *_fd)
101  gzFile fd = static_cast<gzFile>(_fd);
102 
103  if(gzeof(fd))
104  throw XIOError(__FILE__, __LINE__);
105  uint32_t size =
106  sizeof(uint32_t) //allsize
107  + sizeof(int32_t) //time().sec()
108  + sizeof(int32_t); //time().usec()
109  std::vector<char> buf(size);
110  XPrimaryDriver::RawDataReader reader(buf);
111  if(gzread(fd, &buf[0], size) == -1) throw XIOError(__FILE__, __LINE__);
112  m_allsize = reader.pop<uint32_t>();
113  long sec = reader.pop<int32_t>();
114  long usec = reader.pop<int32_t>();
115  m_time = XTime(sec, usec);
116 }
117 void
120  gzFile fd = static_cast<gzFile>(_fd);
121 
122  readHeader(fd);
123  char name[256], sup[256];
124  gzgetline(fd, (unsigned char*)name, 256, '\0');
125  gzgetline(fd, (unsigned char*)sup, 256, '\0');
126  if(strlen(name) == 0) {
127  throw XBrokenRecordError(__FILE__, __LINE__);
128  }
129  shared_ptr<XNode> driver_precast = m_drivers->getChild(name);
130  auto driver = dynamic_pointer_cast<XPrimaryDriver>(driver_precast);
131  uint32_t size =
132  m_allsize - (
133  sizeof(uint32_t) //allsize
134  + sizeof(int32_t) //time().sec()
135  + sizeof(int32_t) //time().usec()
136  + strlen(name) //name of driver
137  + strlen(sup) //reserved
138  + 2 //two null chars
139  + sizeof(uint32_t) //allsize
140  );
141  // m_time must be copied before unlocking
142  XTime time(m_time);
143  trans( *m_posString) = time.getTimeStr();
144  if( !driver || (size > MAX_RAW_RECORD_SIZE)) {
145  if(gzseek(fd, size + sizeof(uint32_t), SEEK_CUR) == -1)
146  throw XIOError(__FILE__, __LINE__);
147  if(driver)
148  throw XBrokenRecordError(__FILE__, __LINE__);
149  if(driver_precast)
150  throw XNoDriverError(formatString_tr(I18N_NOOP("Typemismatch: %s"), name),
151  __FILE__, __LINE__);
152  else
153  throw XNoDriverError(name, __FILE__, __LINE__);
154  }
155  auto rawdata = std::make_shared<XPrimaryDriver::RawData>();
156  try {
157  rawdata->resize(size);
158  if(gzread(fd, &rawdata->at(0), size) == -1)
159  throw XIOError(__FILE__, __LINE__);
160  std::vector<char> buf(sizeof(uint32_t));
161  if(gzread(fd, &buf[0], sizeof(uint32_t)) == -1)
162  throw XIOError(__FILE__, __LINE__);
163  XPrimaryDriver::RawDataReader reader(buf);
164  uint32_t footer_allsize = reader.pop<uint32_t>();
165  if(footer_allsize != m_allsize)
166  throw XBrokenRecordError(__FILE__, __LINE__);
167  }
168  catch (XRecordError &e) {
169  driver->finishWritingRaw(rawdata, XTime(), XTime());
170  throw e;
171  }
172  mutex.unlock();
173  { XScopedLock<XMutex> lock(m_drivermutex);
174  driver->finishWritingRaw(rawdata, XTime::now(), time);
175  }
176 }
177 void
178 XRawStreamRecordReader::gzgetline(void* _fd, unsigned char*buf, unsigned int len, int del)
179  throw (XIOError &) {
180  gzFile fd = static_cast<gzFile>(_fd);
181 
182  int c;
183  for(unsigned int i = 0; i < len; i++) {
184  c = gzgetc(fd);
185  if(c == -1) throw XIOError(__FILE__, __LINE__);
186  *(buf++) = (unsigned char)c;
187  if(c == del) return;
188  }
189  throw XBufferOverflowError(__FILE__, __LINE__);
190 }
191 void
194  gzrewind(static_cast<gzFile>(fd));
195 }
196 void
197 XRawStreamRecordReader::previous_(void *fd)
199  if(gzseek(static_cast<gzFile>(fd), -sizeof(uint32_t), SEEK_CUR) == -1) throw XIOError(__FILE__, __LINE__);
200  goToHeader(fd);
201 }
202 void
203 XRawStreamRecordReader::next_(void *fd)
205  readHeader(fd);
206  uint32_t headersize = sizeof(uint32_t) //allsize
207  + sizeof(int32_t) //time().sec()
208  + sizeof(int32_t); //time().usec()
209  if(gzseek(static_cast<gzFile>(fd), m_allsize - headersize, SEEK_CUR) == -1) throw XIOError(__FILE__, __LINE__);
210 }
211 void
212 XRawStreamRecordReader::goToHeader(void *_fd)
214  gzFile fd = static_cast<gzFile>(_fd);
215 
216  if(gzeof(fd)) throw XIOError(__FILE__, __LINE__);
217  std::vector<char> buf(sizeof(uint32_t));
218  XPrimaryDriver::RawDataReader reader(buf);
219  if(gzread(fd, &buf[0], sizeof(uint32_t)) == Z_NULL) throw XIOError(__FILE__, __LINE__);
220  int allsize = reader.pop<uint32_t>();
221  if(gzseek(fd, -allsize, SEEK_CUR) == -1) throw XIOError(__FILE__, __LINE__);
222 }
223 void
224 XRawStreamRecordReader::terminate() {
225  m_periodicTerm = 0;
226  for(auto &&x: m_threads) {
227  x->terminate();
228  }
229  XScopedLock<XCondition> lock(m_condition);
230  m_condition.broadcast();
231 }
232 void
233 XRawStreamRecordReader::join() {
234  for(auto &&x: m_threads) {
235  x->waitFor();
236  }
237 }
238 
239 void
240 XRawStreamRecordReader::onPlayCondChanged(const Snapshot &shot, XValueNodeBase *) {
241  Snapshot shot_this( *this);
242  double ms = 1.0;
243  if(shot_this[ *m_speed].to_str() == SPEED_FASTEST) ms = 0.1;
244  if(shot_this[ *m_speed].to_str() == SPEED_FAST) ms = 10.0;
245  if(shot_this[ *m_speed].to_str() == SPEED_NORMAL) ms = 30.0;
246  if(shot_this[ *m_speed].to_str() == SPEED_SLOW) ms = 100.0;
247  if( !shot_this[ *m_fastForward] && !shot_this[ *m_rewind]) ms = 0;
248  if(shot_this[ *m_rewind]) ms = -ms;
249  m_periodicTerm = ms;
250  XScopedLock<XCondition> lock(m_condition);
251  m_condition.broadcast();
252 }
253 void
254 XRawStreamRecordReader::onStop(const Snapshot &shot, XTouchableNode *) {
255  m_periodicTerm = 0;
256  g_statusPrinter->printMessage(i18n("Stopped"));
257  iterate_commit([=](Transaction &tr){
258  tr[ *m_fastForward] = false;
259  tr[ *m_rewind] = false;
260  tr.unmark(m_lsnPlayCond);
261  });
262 }
263 void
264 XRawStreamRecordReader::onFirst(const Snapshot &shot, XTouchableNode *) {
265  if(m_pGFD) {
266  try {
267  m_filemutex.lock();
268  first_(m_pGFD);
269  parseOne(m_pGFD, m_filemutex);
270  g_statusPrinter->printMessage(i18n("First"));
271  }
272  catch (XRecordError &e) {
273  m_filemutex.unlock();
274  e.print(i18n("No Record, because "));
275  }
276  }
277 }
278 void
279 XRawStreamRecordReader::onNext(const Snapshot &shot, XTouchableNode *) {
280  if(m_pGFD) {
281  try {
282  m_filemutex.lock();
283  parseOne(m_pGFD, m_filemutex);
284  g_statusPrinter->printMessage(i18n("Next"));
285  }
286  catch (XRecordError &e) {
287  m_filemutex.unlock();
288  e.print(i18n("No Record, because "));
289  }
290  }
291 }
292 void
293 XRawStreamRecordReader::onBack(const Snapshot &shot, XTouchableNode *) {
294  if(m_pGFD) {
295  try {
296  m_filemutex.lock();
297  previous_(m_pGFD);
298  previous_(m_pGFD);
299  parseOne(m_pGFD, m_filemutex);
300  g_statusPrinter->printMessage(i18n("Previous"));
301  }
302  catch (XRecordError &e) {
303  m_filemutex.unlock();
304  e.print(i18n("No Record, because "));
305  }
306  }
307 }
308 
309 void *XRawStreamRecordReader::execute(const atomic<bool> &terminated) {
310  while( !terminated) {
311  double ms = 0.0;
312  {
313  XScopedLock<XCondition> lock(m_condition);
314  while((fabs((ms = m_periodicTerm)) < 1e-4) && !terminated)
315  m_condition.wait();
316  }
317 
318  if(terminated) break;
319 
320  try {
321  m_filemutex.lock();
322  if(ms < 0.0) {
323  previous_(m_pGFD);
324  previous_(m_pGFD);
325  }
326  parseOne(m_pGFD, m_filemutex);
327  }
328  catch (XNoDriverError &e) {
329  m_filemutex.unlock();
330  e.print(i18n("No such driver :") + e.name);
331  }
332  catch (XRecordError &e) {
333  m_periodicTerm = 0.0;
334  iterate_commit([=](Transaction &tr){
335  tr[ *m_fastForward] = false;
336  tr[ *m_rewind] = false;
337  tr.unmark(m_lsnPlayCond);
338  });
339  m_filemutex.unlock();
340  e.print(i18n("No Record, because "));
341  }
342 
343  msecsleep(lrint(fabs(ms)));
344  }
345  return NULL;
346 }

Generated for KAME4 by  doxygen 1.8.3