Python* API Reference for Intel® Data Analytics Acceleration Library 2020 Update 1

compression_online.py

1 # file: compression_online.py
2 #===============================================================================
3 # Copyright 2014-2020 Intel Corporation
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #===============================================================================
17 
18 #
19 # ! Content:
20 # ! Python example of compression in the online processing mode
21 # !
22 # !*****************************************************************************
23 
24 #
25 
26 
27 #
28 
29 import os
30 import sys
31 
32 if sys.version[0] == '2':
33  import Queue as Queue
34 else:
35  import queue as Queue
36 
37 import numpy as np
38 
39 from daal.data_management import Compressor_Zlib, Decompressor_Zlib, level9, DecompressionStream, CompressionStream
40 
41 utils_folder = os.path.realpath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))
42 if utils_folder not in sys.path:
43  sys.path.insert(0, utils_folder)
44 from utils import getCRC32, readTextFile
45 
46 datasetFileName = os.path.join('..', 'data', 'online', 'logitboost_train.csv')
47 
48 # Queue for sending and receiving compressed data blocks
49 # queue_DataBlock
50 sendReceiveQueue = Queue.Queue()
51 
52 maxDataBlockSize = 16384 # Maximum size of a data block
53 userDefinedBlockSize = 7000 # Size for read data from a decompression stream
54 
55 def getDataBlock(sentDataStream, availableDataSize):
56  cur_pos = sentDataStream.size - availableDataSize
57 
58  # return next slice of array and remaining datasize
59  if availableDataSize >= maxDataBlockSize:
60  return (sentDataStream[cur_pos:cur_pos + maxDataBlockSize], availableDataSize - maxDataBlockSize)
61  elif availableDataSize < maxDataBlockSize and availableDataSize > 0:
62  return (sentDataStream[cur_pos:cur_pos + availableDataSize], 0)
63  return (None,None)
64 
65 
66 def sendDataBlock(block):
67  currentBlock = np.copy(block)
68  # Push the current compressed block to the queue
69  sendReceiveQueue.put(currentBlock)
70 
71 
72 def receiveDataBlock():
73  # Stop at the end of the queue
74  if sendReceiveQueue.empty():
75  return None
76  # Receive and copy the current compressed block from the queue
77  return np.copy(sendReceiveQueue.get())
78 
79 
80 def printCRC32(sentDataStream, receivedDataStream):
81  # Compute checksums for full input data and full received data
82  crcSentDataStream = getCRC32(sentDataStream)
83  crcReceivedDataStream = getCRC32(receivedDataStream)
84 
85  print("\nCompression example program results:\n")
86 
87  print("Input data checksum: 0x{:02X}".format(crcSentDataStream))
88  print("Received data checksum: 0x{:02X}".format(crcReceivedDataStream))
89 
90  if sentDataStream.size != receivedDataStream.size:
91  print("ERROR: Received data size mismatches with the sent data size")
92 
93  elif crcSentDataStream != crcReceivedDataStream:
94  print("ERROR: Received data CRC mismatches with the sent data CRC")
95  else:
96  print("OK: Received data CRC matches with the sent data CRC")
97 
98 
99 if __name__ == "__main__":
100  # Read data from a file and allocate memory
101  sentDataStream = readTextFile(datasetFileName)
102 
103  # Create a compressor
104  compressor = Compressor_Zlib()
105  compressor.parameter.gzHeader = True
106  compressor.parameter.level = level9
107 
108  # Create a stream for compression
109  compressionStream = CompressionStream(compressor)
110 
111  # Receive data by blocks from sentDataStream for further compression and send it
112  (uncompressedDataBlock, availableDataSize) = getDataBlock(sentDataStream, sentDataStream.size)
113  while uncompressedDataBlock is not None:
114  # Put a data block to compressionStream and compress if needed
115  compressionStream.push_back(uncompressedDataBlock)
116 
117  # Get access to compressed blocks stored in compressionStream without an actual compressed data copy
118  compressedBlocks = compressionStream.getCompressedBlocksCollection()
119 
120  # Send compressed blocks stored in compressionStream
121  for i in range(compressedBlocks.size()):
122  # Send the current compressed block from compressionStream
123  sendDataBlock(compressedBlocks[i].getArray())
124 
125  # Receive the next data block for compression
126  (uncompressedDataBlock, availableDataSize) = getDataBlock(sentDataStream, availableDataSize)
127 
128  # Create a decompressor
129  decompressor = Decompressor_Zlib()
130  decompressor.parameter.gzHeader = True
131 
132  # Create a stream for decompression
133  decompressionStream = DecompressionStream(decompressor)
134 
135  # Actual size of decompressed data currently read from decompressionStream
136  readSize = 0
137 
138  # Received uncompressed data stream
139  receivedDataStream = np.empty(0, dtype=np.uint8)
140  tmp_block = np.empty(userDefinedBlockSize, dtype=np.uint8)
141 
142  # Receive compressed data by blocks
143  compressedDataBlock = receiveDataBlock()
144 
145  while compressedDataBlock is not None:
146  # Write a received block to decompressionStream
147  decompressionStream.push_back(compressedDataBlock)
148 
149  # Asynchronous read from decompressionStream
150  while True:
151  # Read userDefinedBlockSize bytes from decompressionStream to the end of receivedDataStream
152  readSize = decompressionStream.copyDecompressedArray(tmp_block)
153  if readSize == 0:
154  break
155  # Update the actual data size in receivedDataStream
156  receivedDataStream = np.concatenate((receivedDataStream, tmp_block[:readSize]))
157 
158  # Receive next block
159  compressedDataBlock = receiveDataBlock()
160 
161  # Compute and print checksums for sentDataStream and receivedDataStream
162  printCRC32(sentDataStream, receivedDataStream)

For more complete information about compiler optimizations, see our Optimization Notice.