recorder.cpp
1 /***************************************************************************
2  Copyright (C) 2002-2015 Kentaro Kitagawa
3  kitagawa@phys.s.u-tokyo.ac.jp
4 
5  This program is free software; you can redistribute it and/or
6  modify it under the terms of the GNU Library General Public
7  License as published by the Free Software Foundation; either
8  version 2 of the License, or (at your option) any later version.
9 
10  You should have received a copy of the GNU Library General
11  Public License and a list of authors along with this program;
12  see the files COPYING and AUTHORS.
13 ***************************************************************************/
14 //---------------------------------------------------------------------------
15 
16 #include "recorder.h"
17 #include "analyzer.h"
18 #include "primarydriver.h"
19 #include "xtime.h"
20 
21 #include <zlib.h>
22 #include <vector>
23 
24 //---------------------------------------------------------------------------
25 #define OFSMODE std::ios::out | std::ios::app | std::ios::ate
26 
27 XRawStream::XRawStream(const char *name, bool runtime, const shared_ptr<XDriverList> &driverlist)
28  : XNode(name, runtime),
29  m_drivers(driverlist),
30  m_pGFD(0),
31  m_filename(create<XStringNode>("Filename", true)) {
32 }
33 XRawStream::~XRawStream() {
34  if(m_pGFD) gzclose(static_cast<gzFile>(m_pGFD));
35 }
36 
37 XRawStreamRecorder::XRawStreamRecorder(const char *name, bool runtime, const shared_ptr<XDriverList> &driverlist)
38  : XRawStream(name, runtime, driverlist),
39  m_recording(create<XBoolNode>("Recording", true)) {
40 
41  iterate_commit([=](Transaction &tr){
42  tr[ *recording()] = false;
43  m_lsnOnOpen = tr[ *filename()].onValueChanged().connectWeakly(
44  shared_from_this(), &XRawStreamRecorder::onOpen);
45  m_lsnOnFlush = tr[ *recording()].onValueChanged().connectWeakly(
46  shared_from_this(), &XRawStreamRecorder::onFlush);
47  });
48  m_drivers->iterate_commit([=](Transaction &tr){
49  m_lsnOnCatch = tr[ *m_drivers].onCatch().connect( *this, &XRawStreamRecorder::onCatch);
50  m_lsnOnRelease = tr[ *m_drivers].onRelease().connect( *this, &XRawStreamRecorder::onRelease);
51  });
52 }
53 void
54 XRawStreamRecorder::onCatch(const Snapshot &shot, const XListNodeBase::Payload::CatchEvent &e) {
55  auto driver = static_pointer_cast<XDriver>(e.caught);
56  driver->iterate_commit([=](Transaction &tr){
57  if(m_lsnOnRecord)
58  tr[ *driver].onRecord().connect(m_lsnOnRecord);
59  else
60  m_lsnOnRecord = tr[ *driver].onRecord().connectWeakly(
61  shared_from_this(), &XRawStreamRecorder::onRecord);
62  });
63 }
64 void
65 XRawStreamRecorder::onRelease(const Snapshot &shot, const XListNodeBase::Payload::ReleaseEvent &e) {
66  auto driver = static_pointer_cast<XDriver>(e.released);
67  driver->iterate_commit([=](Transaction &tr){
68  tr[ *driver].onRecord().disconnect(m_lsnOnRecord);
69  });
70 }
71 void
72 XRawStreamRecorder::onOpen(const Snapshot &shot, XValueNodeBase *) {
73  if(m_pGFD) gzclose(static_cast<gzFile>(m_pGFD));
74  m_pGFD = gzopen(QString(( **filename())->to_str()).toLocal8Bit().data(), "wb");
75 }
76 void
77 XRawStreamRecorder::onFlush(const Snapshot &shot, XValueNodeBase *) {
78  if( !***recording())
79  if(m_pGFD) {
80  m_filemutex.lock();
81  gzflush(static_cast<gzFile>(m_pGFD), Z_FULL_FLUSH);
82  m_filemutex.unlock();
83  }
84 }
85 void
86 XRawStreamRecorder::onRecord(const Snapshot &shot, XDriver *d) {
87  if( ***recording() && m_pGFD) {
88  auto *driver = dynamic_cast<XPrimaryDriver*>(d);
89  if(driver) {
90  const XPrimaryDriver::RawData &rawdata(shot[ *driver].rawData());
91  uint32_t size = rawdata.size();
92  if(size) {
93  uint32_t headersize =
94  sizeof(uint32_t) //allsize
95  + sizeof(int32_t) //time().sec()
96  + sizeof(int32_t); //time().usec()
97  // size of raw record wrapped by header and footer
98  uint32_t allsize =
99  headersize
100  + driver->getName().size() //name of driver
101  + 2 //two null chars
102  + size //rawData
103  + sizeof(uint32_t); //allsize
105  header.push((uint32_t)allsize);
106  header.push((int32_t)shot[ *driver].time().sec());
107  header.push((int32_t)shot[ *driver].time().usec());
108  assert(header.size() == headersize);
109 
110  m_filemutex.lock();
111  gzwrite(static_cast<gzFile>(m_pGFD), &header[0], header.size());
112  gzprintf(static_cast<gzFile>(m_pGFD), "%s", (const char*)driver->getName().c_str());
113  gzputc(static_cast<gzFile>(m_pGFD), '\0');
114  gzputc(static_cast<gzFile>(m_pGFD), '\0'); //Reserved
115  gzwrite(static_cast<gzFile>(m_pGFD), &rawdata[0], size);
116  header.clear(); //using as a footer.
117  header.push((uint32_t)allsize);
118  gzwrite(static_cast<gzFile>(m_pGFD), &header[0], header.size());
119  m_filemutex.unlock();
120  }
121  }
122  }
123 }
124 
125 
126 XTextWriter::XTextWriter(const char *name, bool runtime,
127  const shared_ptr<XDriverList> &driverlist, const shared_ptr<XScalarEntryList> &entrylist)
128  : XNode(name, runtime),
129  m_drivers(driverlist),
130  m_entries(entrylist),
131  m_filename(create<XStringNode>("Filename", true)),
132  m_lastLine(create<XStringNode>("LastLine", true)),
133  m_recording(create<XBoolNode>("Recording", true)),
134  m_logFilename(create<XStringNode>("LogFilename", false)),
135  m_logRecording(create<XBoolNode>("LogRecording", false)),
136  m_logEvery(create<XUIntNode>("LogEvery", false)) {
137 
138  iterate_commit([=](Transaction &tr){
139  tr[ *recording()] = false;
140  tr[ *lastLine()].setUIEnabled(false);
141  tr[ *logRecording()] = false;
142  tr[ *logEvery()] = 300;
143  m_lsnOnFilenameChanged = tr[ *filename()].onValueChanged().connectWeakly(
144  shared_from_this(), &XTextWriter::onFilenameChanged);
145  m_lsnOnLogFilenameChanged = tr[ *logFilename()].onValueChanged().connectWeakly(
146  shared_from_this(), &XTextWriter::onLogFilenameChanged);
147  });
148  m_drivers->iterate_commit([=](Transaction &tr){
149  m_lsnOnCatch = tr[ *m_drivers].onCatch().connect( *this, &XTextWriter::onCatch);
150  m_lsnOnRelease = tr[ *m_drivers].onRelease().connect( *this, &XTextWriter::onRelease);
151  });
152 }
153 void
154 XTextWriter::onCatch(const Snapshot &shot, const XListNodeBase::Payload::CatchEvent &e) {
155  auto driver = static_pointer_cast<XDriver>(e.caught);
156  driver->iterate_commit([=](Transaction &tr){
157  if(m_lsnOnRecord)
158  tr[ *driver].onRecord().connect(m_lsnOnRecord);
159  else
160  m_lsnOnRecord = tr[ *driver].onRecord().connectWeakly(
161  shared_from_this(), &XTextWriter::onRecord);
162  });
163 }
164 void
165 XTextWriter::onRelease(const Snapshot &shot, const XListNodeBase::Payload::ReleaseEvent &e) {
166  auto driver = static_pointer_cast<XDriver>(e.released);
167  driver->iterate_commit([=](Transaction &tr){
168  tr[ *driver].onRecord().disconnect(m_lsnOnRecord);
169  });
170 }
171 void
172 XTextWriter::onLastLineChanged(const Snapshot &shot, XValueNodeBase *) {
173  XScopedLock<XRecursiveMutex> lock(m_filemutex);
174  if(m_stream.good()) {
175  m_stream << shot[ *lastLine()].to_str()
176  << std::endl;
177  }
178 }
179 void
180 XTextWriter::onRecord(const Snapshot &shot, XDriver *driver) {
181  Snapshot shot_this( *this);
182  XScopedLock<XRecursiveMutex> lock(m_logFilemutex);
183  XTime logtime = XTime::now();
184  XString logline;
185  bool record_log = shot_this[ *logRecording()] &&
186  (logtime - m_loggedTime > shot_this[ *logEvery()]);
187  if(shot_this[ *recording()] || record_log) {
188  if(shot[ *driver].time()) {
189  for(;;) {
190  Snapshot shot_entries( *m_entries);
191  if( !shot_entries.size())
192  break;
193  const XNode::NodeList &entries_list( *shot_entries.list());
194  //logger
195  if(record_log) {
196  m_loggedTime = logtime;
197  for(auto it = entries_list.begin(); it != entries_list.end(); it++) {
198  auto entry = static_pointer_cast<XScalarEntry>( *it);
199  logline.append(shot_entries[ *entry->value()].to_str() + KAME_DATAFILE_DELIMITER);
200  }
201  logline.append(m_loggedTime.getTimeFmtStr("%Y/%m/%d %H:%M:%S"));
202  }
203  if( !shot_this[ *recording()])
204  break;
205  //triggered writer
206  bool triggered = false;
207  for(auto it = entries_list.begin(); it != entries_list.end(); it++) {
208  auto entry = static_pointer_cast<XScalarEntry>( *it);
209  if( !shot_entries[ *entry->store()]) continue;
210  shared_ptr<XDriver> d(entry->driver());
211  if( !d) continue;
212  if((d.get() == driver) && shot[ *entry].isTriggered()) {
213  triggered = true;
214  break;
215  }
216  }
217  if( !triggered)
218  break;
219  Transaction tr_entries(shot_entries);
220  XString buf;
221  for(auto it = entries_list.begin(); it != entries_list.end(); it++) {
222  auto entry = static_pointer_cast<XScalarEntry>( *it);
223  if( !shot_entries[ *entry->store()]) continue;
224  entry->storeValue(tr_entries);
225  buf.append(shot_entries[ *entry->value()].to_str() + KAME_DATAFILE_DELIMITER);
226  }
227  buf.append(shot[ *driver].time().getTimeFmtStr("%Y/%m/%d %H:%M:%S"));
228  if(tr_entries.commit()) {
229  trans( *lastLine()) = buf;
230  break;
231  }
232  }
233  }
234  }
235  if(record_log) {
236  if(m_logStream.good()) {
237  m_logStream << logline
238  << std::endl;
239  }
240  }
241 }
242 
243 void
244 XTextWriter::onFilenameChanged(const Snapshot &shot, XValueNodeBase *) {
245  XScopedLock<XRecursiveMutex> lock(m_filemutex);
246  if(m_stream.is_open()) m_stream.close();
247  m_stream.clear();
248  m_stream.open((const char*)QString(shot[ *filename()].to_str()).toLocal8Bit().data(), OFSMODE);
249 
250  if(m_stream.good()) {
251  iterate_commit([=](Transaction &tr){
252  m_lsnOnFlush = tr[ *recording()].onValueChanged().connectWeakly(
253  shared_from_this(), &XTextWriter::onFlush);
254  m_lsnOnLastLineChanged = tr[ *lastLine()].onValueChanged().connectWeakly(
255  shared_from_this(), &XTextWriter::onLastLineChanged);
256  });
257  lastLine()->setUIEnabled(true);
258 
259  XString buf;
260  buf = "#";
261  Snapshot shot_entries( *m_entries);
262  if(shot_entries.size()) {
263  const XNode::NodeList &entries_list( *shot_entries.list());
264  for(auto it = entries_list.begin(); it != entries_list.end(); it++) {
265  auto entry = static_pointer_cast<XScalarEntry>( *it);
266  if( !shot_entries[ *entry->store()]) continue;
267  buf.append(entry->getLabel());
268  buf.append(KAME_DATAFILE_DELIMITER);
269  }
270  }
271  buf.append("Date" KAME_DATAFILE_DELIMITER "Time" KAME_DATAFILE_DELIMITER "msec");
272  trans( *lastLine()) = buf;
273  }
274  else {
275  m_lsnOnFlush.reset();
276  m_lsnOnLastLineChanged.reset();
277  lastLine()->setUIEnabled(false);
278  gErrPrint(i18n("Failed to open file."));
279  }
280 }
281 void
282 XTextWriter::onFlush(const Snapshot &shot, XValueNodeBase *) {
283  lastLine()->setUIEnabled( ***recording());
284  if( !***recording()) {
285  XScopedLock<XRecursiveMutex> lock(m_filemutex);
286  if(m_stream.good())
287  m_stream.flush();
288  }
289 }
290 
291 void
292 XTextWriter::onLogFilenameChanged(const Snapshot &shot, XValueNodeBase *) {
293  XScopedLock<XRecursiveMutex> lock(m_logFilemutex);
294  if(m_logStream.is_open()) m_logStream.close();
295  m_logStream.clear();
296  m_logStream.open((const char*)QString(shot[ *logFilename()].to_str()).toLocal8Bit().data(), OFSMODE);
297 
298  if(m_logStream.good()) {
299  XString buf;
300  buf = "#";
301  Snapshot shot_entries( *m_entries);
302  if(shot_entries.size()) {
303  const XNode::NodeList &entries_list( *shot_entries.list());
304  for(auto it = entries_list.begin(); it != entries_list.end(); it++) {
305  auto entry = static_pointer_cast<XScalarEntry>( *it);
306  buf.append(entry->getLabel());
307  buf.append(KAME_DATAFILE_DELIMITER);
308  }
309  }
310  buf.append("Date" KAME_DATAFILE_DELIMITER "Time" KAME_DATAFILE_DELIMITER "msec");
311  m_logStream << buf
312  << std::endl;
313  }
314  else {
315  gWarnPrint(i18n("All-entry logger: Failed to open file."));
316  }
317 }

Generated for KAME4 by  doxygen 1.8.3