GEVClient receiver
Introduction
In the field of image processing and computer vision, efficient handling and manipulation of image data are essential. The GEVClient library in Python offers a robust solution for receiving and processing images from GigE Vision (GEV) servers, making it suitable for prototyping. This guide covers the key aspects of using GEVClient to receive images, process them, and effectively manage image stacks. Additionally, it discusses the configuration of GEV servers to optimize image transmission and ensure seamless integration with Python applications.
For those developing production image analysis systems or managing large volumes of image data, understanding the capabilities of GEVClient can significantly enhance projects with high-performance image processing techniques. However, for production use, the C++ GEVClient is recommended.
Prerequisite
Before executing the Python GEV client, ensure that the mvIMPACT library is installed.
You can follow this tutorial to complete the installation.
Complete example code
Example code in python
gev_client_receiver.py |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207 | import os, platform, sys
# import all the stuff from our SDK into the current scope
from mvIMPACT import acquire
# import all the helper functions for the examples using our SDK such as 'conditionalSetProperty' into the current scope
# If you want to use this module in your code feel free to do so but make sure the 'Common' folder resides in a sub-folder of your project then
from mvIMPACT.Common import exampleHelper
from enum import Enum
# For systems with NO mvDisplay library support
import ctypes
from PIL import Image
import numpy
import threading
class GEVClient(threading.Thread):
def __init__(self, name, image_dir) -> None:
threading.Thread.__init__(self)
self.name = name
self.image_base_dir = image_dir
self._stop_event = threading.Event()
def run(self):
print(f"Thread {self.name}: starting")
self.connect()
def stop(self):
print(f"Thread {self.name}: stopping")
self._stop_event.set()
self.fi.acquisitionStop()
def connect(self):
devMgr = acquire.DeviceManager()
# Adjust the device index to the one that you want to use
self.pDev = devMgr[0]
if self.pDev == None:
exampleHelper.requestENTERFromUser()
sys.exit(-1)
self.pDev.open()
print("Please enter the number of images to capture followed by [ENTER]: ", end='')
self.framesToCapture = exampleHelper.getNumberFromUser()
if self.framesToCapture < 1:
print("Invalid input! Please capture at least one image")
sys.exit(-1)
self.fi = acquire.FunctionInterface(self.pDev)
self.statistics = acquire.Statistics(self.pDev)
while self.fi.imageRequestSingle() == acquire.DMR_NO_ERROR:
print("Buffer queued")
self.pPreviousRequest = None
print('Connected to device !')
# Comment in to set the acquisition parameters if neccessary
# self.set_acquisition_parameters()
self.fi.acquisitionStart()
print('Acquisition started ! Ready to receiver images...')
self.image_acquisition_loop()
def set_acquisition_parameters(self):
'''
Sets the acquisition parameters for the device.
This method initializes an acquisition property and binds it to a
specific component of the device.
In this case the number of layers for the acquisition property is set to predefined values.
'''
acq_property = acquire.PropertyS()
locator = acquire.DeviceComponentLocator(self.pDev, acquire.dltSetting, "Base")
locator.bindComponent(acq_property, "taskImaging.layerSelection.indexes.Values")
current_layers = "0,10,20,30"
acq_property.write(current_layers)
def image_acquisition_loop(self):
"""
Continuously acquires images from a device, processes them, and saves them to disk.
This function runs a loop to capture a specified number of frames from a device. For each frame:
- It waits for an image request to be ready.
- If the request is valid, it retrieves the image buffer and processes it.
- If the pixel format of the image buffer is 22, it converts the buffer to a numpy array and saves it as an image.
- If the pixel format is not 22, it extracts metadata from the buffer.
- It prints metadata and statistics at regular intervals.
- It manages the request lifecycle by unlocking the previous request and issuing a new image request.
Raises:
Exception: If the image buffer has an unsupported channel count.
Exception: If the directory for saving images already exists, indicating a potential data overwrite.
Note:
- The function assumes that the device and necessary configurations are already set up.
- The function handles specific pixel formats and may need adjustments for other formats.
- The function does NOT include error handling for image request timeouts and device-specific issues.
"""
idx = 0
pixel_format = 22
# Metadata dictionary - do NOT change key positions since the order is set by the server
meta_data = {"frame_id": 0, "seconds": 0, "microseconds": 0, "measurement_id": 0, "duration_ms": 0}
for frame in range(self.framesToCapture):
requestNr = self.fi.imageRequestWaitFor(10000)
if self.fi.isRequestNrValid(requestNr):
pRequest: acquire.Request = self.fi.getRequest(requestNr)
if pRequest.isOK:
if frame%100 == 0:
print(f'''Info from {self.pDev.serial.read()}:
{self.statistics.framesPerSecond.name()}: {self.statistics.framesPerSecond.readS()},
{self.statistics.errorCount.name()}: {self.statistics.errorCount.readS()},
{self.statistics.captureTime_s.name()}: {self.statistics.captureTime_s.readS()}''')
bufferPartCount = pRequest.getBufferPartCount()
meta_data_key_iterator = iter(meta_data.keys())
for part in range(bufferPartCount):
bufferPart:acquire.BufferPart = pRequest.getBufferPart(part)
imageBufferDesc:acquire.ImageBufferDesc = bufferPart.getImageBufferDesc()
imageBuffer:acquire.ImageBuffer = imageBufferDesc.getBuffer()
cbuf = (ctypes.c_char * imageBuffer.iSize).from_address(int(imageBuffer.vpData))
if imageBuffer.pixelFormat == pixel_format:
print(f'''Image Buffer Information:
Height: {imageBuffer.iHeight},
Width: {imageBuffer.iWidth},
Channel Count: {imageBuffer.iChannelCount},
Buffer Size: {imageBuffer.iSize},
Pixel Format: {imageBuffer.pixelFormat}''')
channelType = numpy.uint8
arr:numpy = numpy.frombuffer(cbuf, dtype = channelType)
arr.shape = (imageBuffer.iHeight, imageBuffer.iWidth, imageBuffer.iChannelCount)
if imageBuffer.iChannelCount == 1:
img = Image.fromarray(arr)
elif imageBuffer.iChannelCount == 3:
img = Image.fromarray(arr, 'RGB')
else:
raise("Unsupported channel count")
img_dir_path = f'{self.image_base_dir}/image_{idx}/'
# If the directory does not exist, create it
if not os.path.exists(img_dir_path):
os.makedirs(img_dir_path)
img_dir_path = f'{img_dir_path}/image_{part:03d}.png'
img.save(img_dir_path)
# Convert the image to tensor
# img_tensor = numpy.array(img)
else:
arr:numpy = numpy.frombuffer(cbuf, dtype = numpy.uint64)
# Extract metadata from the buffer and copy it to the meta_data dictionary with the corresponding key
meta_data[next(meta_data_key_iterator)] = arr[0]
print(f"Meta Data: {meta_data}")
idx += 1
if self.pPreviousRequest != None:
self.pPreviousRequest.unlock()
self.pPreviousRequest = pRequest
self.fi.imageRequestSingle()
else:
# Please note that slow systems or interface technologies in combination with high resolution sensors
# might need more time to transmit an image than the timeout value which has been passed to imageRequestWaitFor().
# If this is the case simply wait multiple times OR increase the timeout(not recommended as usually not necessary
# and potentially makes the capture thread less responsive) and rebuild this application.
# Once the device is configured for triggered image acquisition and the timeout elapsed before
# the device has been triggered this might happen as well.
# The return code would be -2119(DEV_WAIT_FOR_REQUEST_FAILED) in that case, the documentation will provide
# additional information under TDMR_ERROR in the interface reference.
# If waiting with an infinite timeout(-1) it will be necessary to call 'imageRequestReset' from another thread
# to force 'imageRequestWaitFor' to return when no data is coming from the device/can be captured.
# print("imageRequestWaitFor failed (" + str(requestNr) + ", " + acquire.ImpactAcquireException.getErrorCodeAsString(requestNr) + ")")
pass
if __name__ == "__main__":
# Replace the data_base_dir with the directory where you want to save the images
# Directory gets created if it does not exist
# Be aware that data gets overwritten if the directory already exists
data_base_dir = 'data/test/'
# Replace the image_dir_true with the one you want to save the images in
client_thread = GEVClient("ABS-PyGEVClient", data_base_dir)
client_thread.start()
# Wait for the client thread to finish
# Thread will finish when the number of frames to capture is reached
# Keep in mind that requestNr = self.fi.imageRequestWaitFor(10000) reduces the number of frames to capture with each iteration
client_thread.join()
|
Adjustable content
-
Adjust data base dir accordingly:
-
[Optional] Adjust the acquisition parameters on startup:
-
To optimize the startup setup, you can configure the necessary parameters using the set_acquisition_parameters
function. This example demonstrates how to define layers within the container. If additional parameters are needed, they can be added by binding the options via their respective paths. These paths can be located in the ICC (Impact Control Center).
| def set_acquisition_parameters(self):
acq_property = acquire.PropertyS()
locator = acquire.DeviceComponentLocator(self.pDev, acquire.dltSetting, "Base")
locator.bindComponent(acq_property, "taskImaging.layerSelection.indexes.Values")
current_layers = "0,10,20,30"
acq_property.write(current_layers)
|
-
GenDC
- In the following lines, 107 to 128, we provide an interpretation of the container sent by the GEVServer. For a detailed description of the container and its components, please refer to chapter Generic Data Container (GenDC), were
we outline its structure and functionality in depth.
-
Storing data on disk or further processing
- This code stores image containers in separate folders by default. While this setup typically doesn't require modification, it can be adjusted if necessary to suit specific organizational needs.
- If you prefer to process the data directly within the application without storing it on disk, this can be achieved by commenting out the relevant lines. A starting point for this approach is demonstrated in line 156, where a conversion step transforms the image into a tensor.
| img_dir_path = f'{self.image_base_dir}/image_{idx}/'
# If the directory does not exist, create it
if not os.path.exists(img_dir_path):
os.makedirs(img_dir_path)
img_dir_path = f'{img_dir_path}/image_{part:03d}.png'
img.save(img_dir_path)
# Convert the image to tensor
# img_tensor = numpy.array(img)
|
-
Run python client
- To run the application, execute the command
python gev_client_receiver.py
.