Lesson 11 : ValkkaFS

As you learned from earlier lessons, you can redirect video streams to matroska (.mkv) video files.

Here we’ll be streaming video to the custom ValkkaFS filesystem.

ValkkaFS dumps video to a dedicated file, or to an entire partition or disk. Arriving H264 frames are written in their arriving time order, into the same (large) file that is organized in blocks. For more details, consult the ValkkaFS section and the cpp documentation.

Here we provide several examples for writing to and reading from ValkkaFS. These include importing video from ValkkaFS to matroska, and caching frames from ValkkaFS and passing them downstream at play speed.

In a typical VMS application, writing and reading run concurrently: writing thread dumps frames continuously to the disk, while reading thread is evoked only at user’s request.

Writing

Let’s start by dumping video from IP cameras into ValkkaFS.

Download lesson [here]

This will be our filtergraph:

(LiveThread:livethread) -->> (ValkkaFSWriterThread:writerthread)

Let’s import valkka level 1 API, and ValkkaSingleFS from level 2 API:

import time
from valkka.core import *
from valkka.fs import ValkkaSingleFS
from valkka.api2 import loglevel_debug, loglevel_normal, loglevel_crazy

Let’s set our IP camera’s address:

rtsp_address="rtsp://admin:12345@192.168.0.157"

If you want to see the filesystem writing each frame, enable these debugging loggers:

#setLogLevel_filelogger(loglevel_crazy)
#setLogLevel_valkkafslogger(loglevel_crazy)

There are two flavors of ValkkaFS under the valkka.fs namespace, namely ValkkaSingleFS and ValkkaMultiFS. In the former, there is one file per one camera/stream, while in the latter you can dump several streams into the same file.

The ValkkaSingleFS instance handles the metadata of the filesystem. Let’s create a new filesystem and save the metadata into directory /tmp/testvalkkafs

valkkafs = ValkkaSingleFS.newFromDirectory(
    dirname = "/tmp/testvalkkafs",
    blocksize = (2048//8)*1024, # note division by 8: 2048 KiloBITS
    n_blocks = 10,
    verbose = True)

Here one block holds 2048 KBits (256 KBytes) of data. Let suppose that your camera streams 1024KBits per second (kbps): now a block will be finished every 2 seconds.

If you now set your IP camera to key-frame every one second, you will have two key frames per each block, which is a necessary condition for efficient seeking using the filesystem.

The total size of the device file where frames are streamed, will be (256kB * 10) 2560 kB.

You could also skip the parameter n_blocks and instead define the device file size directly with device_size = 2560*1024.

Now the directory has the following files:

blockfile           Table of block timestamps.  Used for seeking, etc.
dumpfile            Frames are streamed into this file (the "device file")
valkkafs.json       Metadata: block size, current block, etc.

Next, we create and start (1) the thread responsible for writing the frames into ValkkaFS and (2) LiveThread that is reading the cameras:

writerthread = ValkkaFSWriterThread("writer", valkkafs.core)
livethread = LiveThread("livethread")

All cameras write to the same FrameFilter, handled by the writing thread:

file_input_framefilter = writerthread.getFrameFilter()

Read camera and designate it with slot number 1

ctx = LiveConnectionContext(LiveConnectionType_rtsp, rtsp_address, 1, file_input_framefilter)

Next, start threads

writerthread.startCall()
livethread.startCall()

Frames with slot number 1 are identified in the filesystem with id number 925412 (which we just invented):

writerthread.setSlotIdCall(1, 925412)

livethread.registerStreamCall(ctx)
livethread.playStreamCall(ctx)

Idle for some secs while the threads run in the background

print("recording!")
time.sleep(3)

At this moment, let’s take a look at the blocktable

a=valkkafs.getBlockTable()
print(a)
if a.max() <= 0:
    print("Not a single block finished so far..")
    valkkafs.core.updateTable(disk_write=True)
    print("Check blocktable again")
    a=valkkafs.getBlockTable()
    if a.max() <= 0:
        print("Not a single frame so far..")
    print(a)

Frames in a certain block are saved definitely into the book-keeping only once a block is finished.

In the code above, we force a block write even if the block has not filled up.

Let’s continue & let the threads do their stuff for some more time

print("recording some more")
time.sleep(30)

livethread.stopCall()
writerthread.stopCall()

Let’s take a look at the blocktable again:

print(a)

print("bye")

Reading 1

In these following two examples, we request frames from ValkkaFS

Download lesson [here]

Same imports as before:

import time, sys
from valkka.core import *
from valkka.fs import ValkkaSingleFS

Load ValkkaFS metadata:

valkkafs = ValkkaSingleFS.loadFromDirectory(dirname="/tmp/testvalkkafs")

Let’s take a look at the blocktable:

a = valkkafs.getBlockTable()
print(a)

Construct the filterchain: write from the reader thread into the verbose InfoFrameFilter

out_filter =InfoFrameFilter("reader_out_filter")
readerthread = ValkkaFSReaderThread("reader", valkkafs.core, out_filter)

Start the reader thread:

readerthread.startCall()

Frames with id number 925412 are mapped into slot 1:

readerthread.setSlotIdCall(1, 925412)

Request blocks 0, 1 from the reader thread. Information of frames from these blocks are dumped on the terminal

readerthread.pullBlocksPyCall([0,1])
time.sleep(1)

Exit the thread:

readerthread.stopCall()
print("bye")

Reading 2

Download lesson [here]

import time, sys
from valkka.core import *
from valkka.fs import ValkkaSingleFS

Load ValkkaFS metadata:

valkkafs = ValkkaSingleFS.loadFromDirectory(dirname="/tmp/testvalkkafs")

Let’s take a look at the blocktable:

a = valkkafs.getBlockTable()
print(a)

Instantiate ValkkaFSTool that allows us to peek into the written data

tool = ValkkaFSTool(valkkafs.core)

Contents of individual blocks can now be inspected like this:

tool.dumpBlock(0)
tool.dumpBlock(1)

You’ll get output like this:

----- Block number : 0 -----
[925412] <BasicFrame: timestamp=1543314164986 subsession_index=0 slot=0 / payload size=29 / H264: slice_type=7> *     0 0 0 1 103 100 0 31 172 17 22 160 80 5 186 16 0 1 25 64
[925412] <BasicFrame: timestamp=1543314164986 subsession_index=0 slot=0 / payload size=8 / H264: slice_type=8>    0 0 0 1 104 238 56 176
[925412] <BasicFrame: timestamp=1543314165135 subsession_index=0 slot=0 / payload size=32 / H264: slice_type=7> *     0 0 0 1 103 100 0 31 172 17 22 160 80 5 186 16 0 1 25 64
[925412] <BasicFrame: timestamp=1543314165135 subsession_index=0 slot=0 / payload size=8 / H264: slice_type=8>    0 0 0 1 104 238 56 176
[925412] <BasicFrame: timestamp=1543314165135 subsession_index=0 slot=0 / payload size=19460 / H264: slice_type=5>    0 0 0 1 101 136 128 8 0 1 191 180 142 114 29 255 192 79 52 19
[925412] <BasicFrame: timestamp=1543314165215 subsession_index=0 slot=0 / payload size=32 / H264: slice_type=7> *     0 0 0 1 103 100 0 31 172 17 22 160 80 5 186 16 0 1 25 64
[925412] <BasicFrame: timestamp=1543314165215 subsession_index=0 slot=0 / payload size=8 / H264: slice_type=8>    0 0 0 1 104 238 56 176
[925412] <BasicFrame: timestamp=1543314165215 subsession_index=0 slot=0 / payload size=19408 / H264: slice_type=5>    0 0 0 1 101 136 128 8 0 1 191 180 142 114 29 255 193 80 200 71
[925412] <BasicFrame: timestamp=1543314165335 subsession_index=0 slot=0 / payload size=4928 / H264: slice_type=1>    0 0 0 1 65 154 0 64 2 19 127 208 117 223 181 129 22 206 32 84
...

Frame id number is indicated in the first column. Asterix (*) marks the seek points. In the final rows, first few bytes of the actual payload are shown.

Let’s see the min and max time of frames written in this ValkkaFS

(t0, t1) = valkkafs.getTimeRange()
print("Min and Max time in milliseconds:", t0, t1)

These are milliseconds, so to get struct_time object we need to do this:

print("Min time:", time.gmtime(t0/1000))
print("Max time:", time.gmtime(t1/1000))

Block numbers corresponding to a certain time range can be searched like this:

req = (t0, t1)
block_indices = valkkafs.getInd(req)
print("Block indices =", block_indices)

Now you could pass to indices to the ValkkaFSReaderThread method pullBlocksPyCall to recover all frames from that time interval.

Another usefull method is getIndNeigh. It returns blocks from the neighborhood of a certain target time:

req = (t1+t0)//2
block_indices = valkkafs.getIndNeigh(n=2, time=req)
print("Block indices =", block_indices)

That will return the target block plus two blocks surrounding it.

You would call this method when a user requests a seek to a certain time and you want to be sure that there are enough frames surrounding that time instant

Matroska export

Let’s start by recalling the very first lesson. There we saw how LiveThread sends Setup Frames at streaming initialization. Setup frames are used all over the libValkka infrastructure, to carry information about the video stream, to signal the stream start and to initialize decoders, muxers, etc.

On the other hand, ValkkaFSReaderThread is designed to be a simple beast: it does not have any notion of stream initialization. It simply provides frames on a per-block basis.

We must use a special FrameFilter called InitStreamFrameFilter, in order to add the Setup Frames into the stream.

Download lesson [here]

Same imports as before:

import time, sys
from valkka.core import *
from valkka.api2 import loglevel_debug, loglevel_normal
from valkka.fs import ValkkaSingleFS

setLogLevel_filelogger(loglevel_debug)

Load ValkkaFS metadata:

valkkafs = ValkkaSingleFS.loadFromDirectory(dirname="/tmp/testvalkkafs")

Let’s take a look at the blocktable:

a = valkkafs.getBlockTable()
print(a)

Next, construct the filterchain. It looks like this:

                                                                          +--> ...
main branch                                                               |
(ValkkaFSWriterThread:readerthread) --> {ForkFrameFilterN:fork_filter} ---+
                                                                          |
                                                                          +--> branch 1

branch 1 : {PassSlotFrameFilter:slot_filter} --> {InitStreamFrameFilter:init_stream} --> {FileFrameFilter:file_filter} --> output file

Here we have introduced yet another FrameFilter that performs forking. An arbitrary number of terminals can be connected to ForkFrameFilterN. Terminals can be connected and disconnected also while threads are running.

The PassSlotFrameFilter passes frames with a certain slot number as we want frames only from a single stream to the final output file.

# main branch
fork_filter = ForkFrameFilterN("fork")
readerthread = ValkkaFSReaderThread("reader", valkkafs.core, fork_filter)

# branch 1
file_filter = FileFrameFilter("file_filter")
init_stream = InitStreamFrameFilter("init_filter", file_filter)
slot_filter = PassSlotFrameFilter("", 1, init_stream)

# connect branch 1
fork_filter.connect("info", slot_filter)

# activate file write
file_filter.activate("kokkelis.mkv")

Start the reader thread:

readerthread.startCall()

Frames with id number 925412 are mapped into slot 1:

readerthread.setSlotIdCall(1, 925412)

Request blocks 0-4 from the reader thread. Information of frames from these blocks are dumped on the terminal

readerthread.pullBlocksPyCall([0,1,3,4])
time.sleep(1)

Exit the thread:

readerthread.stopCall()
print("bye")

Playing frames

As you learned in the previous examples of this section, ValkkaFSReader pushes frames downstream in “bursts”, several blocks worth of frames in a single shot.

However, we also need something that passes recorded frames downstream (say, for visualization and/or for transmission) at “play speed” (say, at that 25 fps).

This is achieved with FrameCacherThread, which caches, seeks and passes frames downstream at play speed.

In detail, ValkkaFSReaderThread passes frames to FrameCacherThread which caches them into memory. After this, seek, play and stop can be requested from FrameCacherThread, which then passes the frames downstream from a seek point and at the original play speed at which the frames were recorded into ValkkaFS.

FrameCacherThread can be given special python callback functions that are being called when the min and max time of cached frames changes and when frame presentation time goes forward.

FrameCacherThread is very similar to other threads that send stream (like LiveThread), so it also handles the sending of Setup Frames downstream correctly.

Download lesson [here]

Same imports as before:

import time, sys
from valkka.core import *
from valkka.api2 import loglevel_debug, loglevel_normal
from valkka.fs import ValkkaSingleFS

setLogLevel_filelogger(loglevel_debug)

Load ValkkaFS metadata:

valkkafs = ValkkaSingleFS.loadFromDirectory(dirname="/tmp/testvalkkafs")

Let’s take a look at the blocktable:

a = valkkafs.getBlockTable()
print(a)

Filterchain is going to look like this:

(ValkkaFSReaderThread:readerthread) -->> (FileCacheThread:cacherthread) --> {InfoFrameFilter:out_filter}
                                                 |
                                                 $
                                   setPyCallback  : [int] current mstime, freq: 500 ms
                                   setPyCallback2 : [tuple] (min mstimestamp, max mstimestamp)

As you can see, where have introduced new notation here.

$ designates callbacks that are used by FileCacheThread. It’s up to you to define the python code in these callbacks. The callbacks are registered by using the setPyCallback and setPyCallback2 methods.

Next, we proceed in constructing the filterchain in end-to-beginning order.

ValkkaFSReaderThread will write all it’s frames into FileCacheThread’s input FrameFilter.

out_filter   = InfoFrameFilter("out_filter") # will be registered later with cacherthread
cacherthread = FileCacheThread("cacher")
readerthread = ValkkaFSReaderThread("reader", valkkafs.core, cacherthread.getFrameFilter()) # ValkkaFSReaderThread => FileCacheThread

Next, define callbacks for FileCacheThread

Define a global variable: a tuple that holds the min and max millisecond timestamps of cached frames:

current_time_limits = None

This following function will be called frequently by FileCacheThread to inform us about the current millisecond timestamp:

def current_time_callback(mstime: int):
    global current_time_limits
    try:
        print("current time", mstime)
        if current_time_limits is None:
            return

        if mstime >= current_time_limits[1]:
            print("current time over cached time limits!")
            # cacherthread.rewindCall() # TODO
            # # or alternatively, handle the situation as you please
            # # .. for example, request more blocks:
            # readerthread.pullBlocksPyCall(your_list_of_blocks)
            pass

    except Exception as e:
        print("current_time_callback failed with ", str(e))
        return

The next callback is evoked when FileCacheThread receives new frames for caching. It informs us about the minimum and maximum millisecond timestamps:

def time_limits_callback(times: tuple):
    global current_time_limits
    try:
        print("new time limits", times)
        current_time_limits = times

    except Exception as e:
        print("time_limits_callback failed with ", str(e))
        return

The callbacks should be kept ASAP (as-simple-as-possible) and return immediately. You also might wan’t them to send a Qt signal in your GUI application.

Typically, they should use only the following methods of the libValkka API:

valkka.fs.ValkkaSingleFS
                    .getBlockTable
                    .getTimeRange

valkka.core.ValkkaFSReaderThread
                    .pullBlocksPyCall

Register the callbacks into the FileCacheThread

cacherthread.setPyCallback(current_time_callback)
cacherthread.setPyCallback2(time_limits_callback)

Start the threads

cacherthread.startCall()
readerthread.startCall()

Frames saved with id number 925412 to ValkkaFS are mapped into slot number 1:

readerthread.setSlotIdCall(1, 925412)

FileCacheThread will write frames with slot number 1 into InfoFrameFilter:

ctx = FileStreamContext(1, out_filter)
cacherthread.registerStreamCall(ctx)

Request blocks 0-4 from the reader thread. The frames will be cached by FileCacheThread.

readerthread.pullBlocksPyCall([0,1,3,4])

Before frames can be played, a seek must be performed to set a time reference.

mstimestamp = int(a[1,0]) # take the first timestamp from the blocktable.  Use int() to convert to normal python integer.
print("seeking to", mstimestamp)
cacherthread.seekStreamsCall(mstimestamp)

It’s up to the API user to assure that the used mstimestamp is within the correct limits (i.e. requested blocks).

Next, let the stream play for 10 seconds

cacherthread.playStreamsCall()
time.sleep(10)

Stop threads

cacherthread.stopCall()
readerthread.stopCall()

print("bye")

ValkkaFSManager

In the previous example, two callback functions which define the application’s behaviour with respect to recorded and cached frames were used.

How you define the callback functions, depends completely on your application, say, if you’re creating an application that does playback of recorded stream, you might want to request new blocks at your current_time_callback, once the time goes over limits of currently cached frames.

We are starting to get some idea on the challenges that arise when doing simultaneous reading, writing, caching and playing of a large number of (non-continuous) video streams. For a more discussions on this, please see the ValkkaFS section.

To make things easier, valkka.fs namespace has a special class ValkkaFSManager that handles the simultaneous & synchronous writing and playing of multiple video streams.

For an example on how to use ValkkaFSManager, please see test_studio_6.py at the PyQt testsuite.