This commit is contained in:
louiscklaw
2025-01-31 22:10:02 +08:00
parent 97df42e0d5
commit 2627562070
2852 changed files with 748727 additions and 0 deletions

View File

@@ -0,0 +1,524 @@
"""Tests for model."""
import os
import platform
import unittest
import numpy as np
import onnx
import tensorflow as tf
import torch
import torchvision
from pkg_resources import parse_version
from neural_compressor.model import MODELS, Model
from neural_compressor.model.model import get_model_fwk_name
from neural_compressor.model.mxnet_model import MXNetModel
from neural_compressor.model.onnx_model import ONNXModel
def build_graph():
try:
graph = tf.Graph()
graph_def = tf.GraphDef()
with tf.Session(graph=graph) as sess:
x = tf.placeholder(tf.float64, shape=(1, 256, 256, 1), name="x")
y = tf.constant(np.random.random((2, 2, 1, 1)), name="y")
op = tf.nn.conv2d(input=x, filter=y, strides=[1, 1, 1, 1], padding="VALID", name="op_to_store")
sess.run(tf.global_variables_initializer())
constant_graph = tf.graph_util.convert_variables_to_constants(sess, sess.graph_def, ["op_to_store"])
graph_def.ParseFromString(constant_graph.SerializeToString())
with graph.as_default():
tf.import_graph_def(graph_def, name="")
except:
graph = tf.Graph()
graph_def = tf.compat.v1.GraphDef()
with tf.compat.v1.Session(graph=graph) as sess:
x = tf.compat.v1.placeholder(tf.float64, shape=(1, 256, 256, 1), name="x")
y = tf.compat.v1.constant(np.random.random((3, 3, 1, 1)), name="y")
op = tf.nn.conv2d(input=x, filters=y, strides=[1, 1, 1, 1], padding="VALID", name="op_to_store")
sess.run(tf.compat.v1.global_variables_initializer())
constant_graph = tf.compat.v1.graph_util.convert_variables_to_constants(
sess, sess.graph_def, ["op_to_store"]
)
graph_def.ParseFromString(constant_graph.SerializeToString())
with graph.as_default():
tf.import_graph_def(graph_def, name="")
return graph
def build_estimator():
def model_fn(features, labels, mode):
logits = tf.keras.layers.Dense(12)(features)
logits = tf.keras.layers.Dense(56)(logits)
logits = tf.keras.layers.Dense(4)(logits)
output_spec = tf.estimator.EstimatorSpec(mode=tf.estimator.ModeKeys.PREDICT, predictions=logits)
return output_spec
return model_fn
def build_input_fn():
def input_fun():
tf.compat.v1.disable_eager_execution()
raw_dataset = np.ones([100, 224, 224, 3], dtype=np.float32)
tf_dataset = tf.compat.v1.data.Dataset.from_tensor_slices(raw_dataset)
tf_dataset = tf_dataset.batch(1)
ds_iterator = tf_dataset.make_initializable_iterator()
iter_tensors = ds_iterator.get_next()
return iter_tensors
return input_fun
def build_keras():
from tensorflow import keras
(train_images, train_labels), (test_images, test_labels) = keras.datasets.fashion_mnist.load_data()
train_images = train_images.astype(np.float32) / 255.0
# Create Keras model
model = keras.Sequential(
[
keras.layers.InputLayer(input_shape=(28, 28), name="input"),
keras.layers.Reshape(target_shape=(28, 28, 1)),
keras.layers.Conv2D(filters=12, kernel_size=(3, 3), activation="relu"),
keras.layers.Conv2D(filters=12, kernel_size=(3, 3), activation="relu"),
keras.layers.Conv2D(filters=12, kernel_size=(3, 3), activation="relu"),
keras.layers.Conv2D(filters=12, kernel_size=(3, 3), activation="relu"),
keras.layers.Conv2D(filters=12, kernel_size=(3, 3), activation="relu"),
keras.layers.MaxPooling2D(pool_size=(2, 2)),
keras.layers.Flatten(),
keras.layers.Dense(10, activation="softmax", name="output"),
]
)
# Compile model with optimizer
opt = keras.optimizers.Adam(learning_rate=0.01)
model.compile(optimizer=opt, loss="sparse_categorical_crossentropy", metrics=["accuracy"])
# # Train model
model.fit(x={"input": train_images[0:100]}, y={"output": train_labels[0:100]}, epochs=1)
return model
class TestTensorflowModel(unittest.TestCase):
@classmethod
def tearDownClass(self):
os.remove("model_test.pb")
def test_graph(self):
graph = build_graph()
model = Model(graph)
model.input_tensor_names = ["x"]
model.output_tensor_names = ["op_to_store"]
self.assertEqual(True, isinstance(model.graph_def, tf.compat.v1.GraphDef))
self.assertEqual(model.input_node_names[0], "x")
self.assertEqual(model.output_node_names[0], "op_to_store")
model.save("model_test.pb")
model = Model("model_test.pb")
self.assertEqual(model.input_tensor_names[0], "x")
self.assertEqual(model.output_tensor_names[0], "op_to_store")
self.assertEqual(model.input_tensor[0].name, "x:0")
self.assertEqual(model.output_tensor[0].name, "op_to_store:0")
# test wrong input tensor names can't set
with self.assertRaises(AssertionError):
model.input_tensor_names = ["wrong_input"]
with self.assertRaises(AssertionError):
model.output_tensor_names = ["wrong_output"]
# test right tensor
model.input_tensor_names = ["x_1"]
model.output_tensor_names = ["op_to_store_1"]
self.assertEqual(True, isinstance(model.graph_def, tf.compat.v1.GraphDef))
def test_validate_graph_node(self):
from neural_compressor.model.tensorflow_model import validate_graph_node
graph = build_graph()
self.assertEqual(False, validate_graph_node(graph.as_graph_def(), []))
self.assertEqual(False, validate_graph_node(graph.as_graph_def(), ["test"]))
self.assertEqual(True, validate_graph_node(graph.as_graph_def(), ["x"]))
def test_estimator(self):
from neural_compressor.adaptor.tf_utils.util import get_estimator_graph
model_fn = build_estimator()
input_fn = build_input_fn()
estimator = tf.estimator.Estimator(model_fn, model_dir=None, config=None, params=None, warm_start_from=None)
with self.assertRaises(AssertionError):
graph_def = Model(estimator).graph_def
model = Model(estimator, input_fn=input_fn)
self.assertEqual(model.output_tensor_names[0], "dense_2/BiasAdd:0")
def test_ckpt(self):
mobilenet_ckpt_url = "http://download.tensorflow.org/models/mobilenet_v1_2018_02_22/mobilenet_v1_1.0_224.tgz"
dst_path = "/tmp/.neural_compressor/mobilenet_v1_1.0_224.tgz"
if platform.system().lower() == "windows":
model_path = "C:\\tmp\.neural_compressor\\mobilenet_v1_1.0_224"
else:
model_path = "./ckpt"
if not os.path.exists(dst_path):
os.system("mkdir -p /tmp/.neural_compressor && wget {} -O {}".format(mobilenet_ckpt_url, dst_path))
if not os.path.getsize(dst_path):
os.system("rm -fr {0} && wget {1} -O {0}".format(dst_path, mobilenet_ckpt_url))
os.system("mkdir -p ckpt && tar xvf {0} -C {1}".format(dst_path, model_path))
model = Model(model_path)
model.output_tensor_names = ["MobilenetV1/Predictions/Reshape_1"]
self.assertEqual(model_path, model.model_path)
self.assertGreaterEqual(len(model.input_tensor_names), 1)
self.assertEqual(len(model.output_tensor_names), 1)
graph_def = model.graph_def
self.assertEqual(True, isinstance(graph_def, tf.compat.v1.GraphDef))
model.graph_def = graph_def
os.system("rm -rf ckpt")
def test_slim(self):
tf.compat.v1.reset_default_graph()
inception_ckpt_url = "http://download.tensorflow.org/models/inception_v1_2016_08_28.tar.gz"
if platform.system().lower() == "windows":
dst_path = "C:\\tmp\\.neural_compressor\\inception_v1_2016_08_28.tar.g"
elif platform.system().lower() == "linux":
dst_path = "/tmp/.neural_compressor/slim/inception_v1_2016_08_28.tar.gz"
if platform.system().lower() == "linux":
if not os.path.exists(dst_path):
os.system("mkdir -p /tmp/.neural_compressor/slim")
os.system("wget {} -O {}".format(inception_ckpt_url, dst_path))
if not os.path.getsize(dst_path):
os.system("rm -fr {0} && wget {1} -O {0}".format(dst_path, inception_ckpt_url))
os.system("mkdir -p slim_ckpt && tar xvf {} -C slim_ckpt".format(dst_path))
if parse_version(tf.version.VERSION) > parse_version("2.0.0"):
return
model = Model("./slim_ckpt/inception_v1.ckpt")
model.name = "inception_v1"
graph_def = model.graph_def
self.assertGreaterEqual(len(model.output_node_names), 1)
self.assertGreaterEqual(len(model.input_node_names), 1)
self.assertEqual(model.model_path, "./slim_ckpt/inception_v1.ckpt")
# test net factory
from neural_compressor.model.nets_factory import TFSlimNetsFactory
factory = TFSlimNetsFactory()
from tf_slim.nets import inception
input_shape = [None, 224, 224, 3]
model_func = inception.inception_v1
arg_scope = inception.inception_v1_arg_scope
num_classes = 1001
factory.register("inceptionv1", model_func, input_shape, arg_scope, num_classes=num_classes)
os.system("rm -rf slim_ckpt")
def test_keras_h5_model(self):
if parse_version(tf.version.VERSION) < parse_version("2.3.0"):
return
keras_model = build_keras()
self.assertEqual("tensorflow", get_model_fwk_name(keras_model))
keras_model.save("./simple_model.h5")
# load from path
model = Model("./simple_model.h5")
self.assertEqual(model.model_path, "./simple_model.h5")
self.assertGreaterEqual(len(model.output_node_names), 1)
self.assertGreaterEqual(len(model.input_node_names), 1)
os.makedirs("./keras_model", exist_ok=True)
model.save("./keras_model")
os.system("rm -rf simple_model.h5")
os.system("rm -rf keras_model")
def test_keras_saved_model(self):
if parse_version(tf.version.VERSION) < parse_version("2.3.0"):
return
keras_model = build_keras()
self.assertEqual("tensorflow", get_model_fwk_name(keras_model))
model = Model(keras_model)
self.assertEqual(model.model_path, None)
self.assertGreaterEqual(len(model.output_node_names), 1)
self.assertGreaterEqual(len(model.input_node_names), 1)
keras_model.save("./simple_model")
# load from path
model = Model("./simple_model")
self.assertEqual(model.model_path, "./simple_model")
self.assertGreaterEqual(len(model.output_node_names), 1)
self.assertGreaterEqual(len(model.input_node_names), 1)
os.makedirs("./keras_model", exist_ok=True)
model.save("./keras_model")
os.system("rm -rf simple_model")
os.system("rm -rf keras_model")
def test_tf_qat_model(self):
if parse_version(tf.version.VERSION) < parse_version("2.3.0"):
return
keras_model = build_keras()
self.assertEqual("tensorflow", get_model_fwk_name(keras_model))
from neural_compressor.model.tensorflow_model import TensorflowQATModel
model = TensorflowQATModel(keras_model)
assert isinstance(model.model, tf.keras.Model)
self.assertEqual(model.model_path, None)
keras_model.save("./simple_model")
# load from path
model = TensorflowQATModel("./simple_model")
assert isinstance(model.model, tf.keras.Model)
self.assertEqual(model.model_path, "./simple_model")
model.save("./keras_model")
loaded_model = tf.keras.models.load_model("./keras_model")
assert isinstance(loaded_model, tf.keras.Model)
model.save("keras_model.h5")
loaded_model = tf.keras.models.load_model("keras_model.h5")
assert isinstance(loaded_model, tf.keras.Model)
root = model.save()
loaded_model = tf.keras.models.load_model(root)
assert isinstance(loaded_model, tf.keras.Model)
os.system("rm -rf simple_model")
os.system("rm -rf keras_model")
os.remove("keras_model.h5")
os.system("rm -rf " + root)
@unittest.skipIf(
parse_version(tf.version.VERSION) < parse_version("2.4.0") or platform.system().lower() == "windows",
"Only supports tf 2.4.0 or above",
)
def test_saved_model(self):
ssd_resnet50_ckpt_url = "http://download.tensorflow.org/models/object_detection/ssd_resnet50_v1_fpn_shared_box_predictor_640x640_coco14_sync_2018_07_03.tar.gz"
center_resnet50_saved_model_url = (
"https://tfhub.dev/tensorflow/centernet/resnet50v1_fpn_512x512/1?tf-hub-format=compressed"
)
dst_path = "/tmp/.neural_compressor/saved_model.tar.gz"
center_dst_path = "/tmp/.neural_compressor/center_saved_model.tar.gz"
if not os.path.exists(dst_path):
os.system("mkdir -p /tmp/.neural_compressor && wget {} -O {}".format(ssd_resnet50_ckpt_url, dst_path))
if not os.path.getsize(dst_path):
os.system("rm -fr {0} && wget {1} -O {0}".format(dst_path, ssd_resnet50_ckpt_url))
if not os.path.exists(center_dst_path):
os.system(
"mkdir -p /tmp/.neural_compressor && wget {} -O {}".format(
center_resnet50_saved_model_url, center_dst_path
)
)
if not os.path.getsize(center_dst_path):
os.system("rm -fr {0} && wget {1} -O {0}".format(center_dst_path, center_resnet50_saved_model_url))
os.system("tar -xvf {}".format(dst_path))
unzip_center_model = "unzip_center_model"
os.system("mkdir -p {} ".format(unzip_center_model))
os.system("tar -xvf {} -C {}".format(center_dst_path, unzip_center_model))
model = Model("ssd_resnet50_v1_fpn_shared_box_predictor_640x640_coco14_sync_2018_07_03/saved_model")
center_model = Model("unzip_center_model")
from tensorflow.python.trackable.autotrackable import AutoTrackable
assert isinstance(
model.model, AutoTrackable
), "The model getter of TensorflowSavedModelModel is not correctly run."
from tensorflow.compat.v1 import graph_util
graph_def = graph_util.convert_variables_to_constants(
sess=model.sess, input_graph_def=model.graph_def, output_node_names=model.output_node_names
)
model.graph_def = graph_def
tmp_saved_model_path = "./tmp_saved_model"
if os.path.exists(tmp_saved_model_path):
os.system("rm -rf {}".format(tmp_saved_model_path))
os.system("mkdir -p {}".format(tmp_saved_model_path))
self.assertTrue(isinstance(model.graph_def, tf.compat.v1.GraphDef))
self.assertTrue(isinstance(model.graph, tf.compat.v1.Graph))
model.save(tmp_saved_model_path)
# load again to make sure model can be loaded
model = Model(tmp_saved_model_path)
os.system("rm -rf ssd_resnet50_v1_fpn_shared_box_predictor_640x640_coco14_sync_2018_07_03")
os.system("rm -rf temp_saved_model")
os.system("rm -rf {}".format(tmp_saved_model_path))
center_graph_def = graph_util.convert_variables_to_constants(
sess=center_model.sess,
input_graph_def=center_model.graph_def,
output_node_names=center_model.output_node_names,
)
center_model.graph_def = center_graph_def
self.assertTrue(isinstance(center_model.graph_def, tf.compat.v1.GraphDef))
self.assertTrue(isinstance(center_model.graph, tf.compat.v1.Graph))
from neural_compressor.model.tensorflow_model import _get_graph_from_saved_model_v1
graph_def, input_names, output_names = _get_graph_from_saved_model_v1(unzip_center_model)
assert graph_def is not None, "Can not parse the saved model..."
from tensorflow.python.saved_model.loader_impl import parse_saved_model_with_debug_info
from neural_compressor.model.tensorflow_model import _contains_function_with_implements_attr
saved_model_proto, _ = parse_saved_model_with_debug_info(unzip_center_model)
self.assertEqual(False, _contains_function_with_implements_attr(saved_model_proto))
os.system("rm -rf unzip_center_model")
def test_tensorflow(self):
from neural_compressor.model.tensorflow_model import TensorflowBaseModel
ori_model = build_graph()
self.assertEqual("tensorflow", get_model_fwk_name(ori_model))
self.assertEqual("tensorflow", get_model_fwk_name(TensorflowBaseModel(ori_model)))
try:
get_model_fwk_name([])
except AssertionError:
pass
try:
get_model_fwk_name("./model.pb")
except AssertionError:
pass
def export_onnx_model(model, path):
x = torch.randn(100, 3, 224, 224, requires_grad=True)
torch_out = model(x)
torch.onnx.export(
model,
x,
path,
export_params=True,
opset_version=11,
do_constant_folding=True,
input_names=["input"],
output_names=["output"],
dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},
)
class TestONNXModel(unittest.TestCase):
cnn_export_path = "cnn.onnx"
cnn_model = torchvision.models.quantization.resnet18()
@classmethod
def setUpClass(self):
cnn_model = torchvision.models.quantization.resnet18()
export_onnx_model(self.cnn_model, self.cnn_export_path)
self.cnn_model = onnx.load(self.cnn_export_path)
@classmethod
def tearDownClass(self):
os.remove(self.cnn_export_path)
def test_model(self):
self.assertEqual("onnxruntime", get_model_fwk_name(self.cnn_export_path))
model = MODELS["onnxruntime"](self.cnn_model)
self.assertEqual(True, isinstance(model, ONNXModel))
self.assertEqual(True, isinstance(model.model, onnx.ModelProto))
model.save("test.onnx")
self.assertEqual(True, os.path.exists("test.onnx"))
os.remove("test.onnx")
class TestPyTorchModel(unittest.TestCase):
def testPyTorch(self):
import torchvision
from neural_compressor.model.torch_model import IPEXModel, PyTorchFXModel, PyTorchModel
ori_model = torchvision.models.mobilenet_v2()
self.assertEqual("pytorch", get_model_fwk_name(ori_model))
pt_model = PyTorchModel(ori_model)
pt_model.model = ori_model
pt_model = PyTorchModel(torchvision.models.mobilenet_v2())
with self.assertRaises(AssertionError):
pt_model.workspace_path = "./pytorch"
ipex_model = IPEXModel(ori_model)
self.assertTrue(ipex_model.model)
ipex_model.model = ori_model
ipex_model = PyTorchModel(torchvision.models.mobilenet_v2())
with self.assertRaises(AssertionError):
ipex_model.workspace_path = "./pytorch"
ipex_model.save("./")
self.assertEqual("pytorch", get_model_fwk_name(PyTorchModel(ori_model)))
self.assertEqual("pytorch", get_model_fwk_name(IPEXModel(ori_model)))
self.assertEqual("pytorch", get_model_fwk_name(PyTorchFXModel(ori_model)))
def load_mxnet_model(symbol_file, param_file):
import mxnet as mx
symbol = mx.sym.load(symbol_file)
save_dict = mx.nd.load(param_file)
arg_params = {}
aux_params = {}
for k, v in save_dict.items():
tp, name = k.split(":", 1)
if tp == "arg":
arg_params[name] = v
return symbol, arg_params, aux_params
class TestMXNetModel(unittest.TestCase):
@classmethod
def setUpClass(self):
if platform.system().lower() == "windows":
self.skipTest(self, "not support mxnet on windows yet")
import mxnet as mx
import mxnet.gluon.nn as nn
net = nn.HybridSequential()
net.add(nn.Dense(128, activation="relu"))
net.add(nn.Dense(64, activation="relu"))
net.add(nn.Dense(10))
net.initialize()
net.hybridize()
fake_data = mx.random.uniform(shape=(1, 128, 128))
net(fake_data)
self.net = net
@classmethod
def tearDownClass(self):
os.remove("test-symbol.json")
os.remove("test-0000.params")
os.remove("test2-symbol.json")
os.remove("test2-0000.params")
def test_model(self):
import mxnet as mx
self.assertEqual("mxnet", get_model_fwk_name(self.net))
model = MODELS["mxnet"](self.net)
self.assertEqual(True, isinstance(model, MXNetModel))
self.assertEqual(True, isinstance(model.model, mx.gluon.HybridBlock))
model.save("./test")
self.assertEqual(True, os.path.exists("test-symbol.json"))
self.assertEqual(True, os.path.exists("test-0000.params"))
net = load_mxnet_model("test-symbol.json", "test-0000.params")
model.model = net
self.assertEqual(True, isinstance(model.model[0], mx.symbol.Symbol))
model.save("./test2")
self.assertEqual(True, os.path.exists("test2-symbol.json"))
self.assertEqual(True, os.path.exists("test2-0000.params"))
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,172 @@
import os
import unittest
import torch
import torchvision
from packaging.version import Version
import neural_compressor.adaptor.pytorch as nc_torch
from neural_compressor import PostTrainingQuantConfig, quantization
from neural_compressor.adaptor.torch_utils.model_wrapper import WeightOnlyLinear
from neural_compressor.model import MODELS
from neural_compressor.model import Model as INCModel
from neural_compressor.model.torch_model import PyTorchModel
try:
import intel_pytorch_extension as ipex
TEST_IPEX = True
except:
TEST_IPEX = False
PT_VERSION = nc_torch.get_torch_version()
if PT_VERSION >= Version("1.8.0-rc1"):
FX_MODE = True
else:
FX_MODE = False
class Model(torch.nn.Module):
def __init__(self):
super(Model, self).__init__()
self.fc1 = torch.nn.Linear(30, 40)
self.fc2 = torch.nn.Linear(40, 30)
self.fc3 = torch.nn.Linear(30, 5)
def forward(self, x):
out = self.fc1(x)
out = self.fc2(out)
out = self.fc3(out)
return out
class TestPytorchModel(unittest.TestCase):
framework = "pytorch"
model = torchvision.models.quantization.resnet18()
lpot_model = MODELS["pytorch"](model)
def test_Model(self):
model = torchvision.models.quantization.resnet18()
inc_model = INCModel(model)
self.assertTrue(isinstance(inc_model, PyTorchModel))
def test_get_all_weight_name(self):
assert len(list(self.lpot_model.get_all_weight_names())) == 62
def test_get_weight(self):
for name, param in self.model.named_parameters():
if name == "layer4.1.conv2.weight":
param.data.fill_(0.0)
if name == "fc.bias":
param.data.fill_(0.1)
assert int(torch.sum(self.lpot_model.get_weight("layer4.1.conv2.weight"))) == 0
assert torch.allclose(torch.sum(torch.tensor(self.lpot_model.get_weight("fc.bias"))), torch.tensor(100.0))
def test_get_input(self):
model = MODELS["pytorch"](torchvision.models.quantization.resnet18())
model.model.eval().fuse_model()
model.register_forward_pre_hook()
rand_input = torch.rand(100, 3, 256, 256).float()
model.model(rand_input)
assert torch.equal(model.get_inputs("x"), rand_input)
model.remove_hooks()
def test_update_weights(self):
self.lpot_model.update_weights("fc.bias", torch.zeros([1000]))
assert int(torch.sum(self.lpot_model.get_weight("fc.bias"))) == 0
def test_gradient(self):
with self.assertRaises(AssertionError):
self.lpot_model.get_gradient("fc.bias")
shape = None
for name, tensor in self.lpot_model._model.named_parameters():
if name == "fc.bias":
shape = tensor.shape
tensor.grad = torch.randn(shape)
break
new_grad = torch.zeros(shape)
self.lpot_model.update_gradient("fc.bias", new_grad)
assert torch.equal(torch.tensor(self.lpot_model.get_gradient("fc.bias")), torch.zeros(shape))
rand_input = torch.rand(100, 3, 256, 256).float()
rand_input.grad = torch.ones_like(rand_input)
assert torch.equal(torch.tensor(self.lpot_model.get_gradient(rand_input)), torch.ones_like(rand_input))
def test_report_sparsity(self):
df, total_sparsity = self.lpot_model.report_sparsity()
self.assertTrue(total_sparsity > 0)
self.assertTrue(len(df) == 22)
def test_WeightOnlyLinear(self):
model = Model()
input = torch.randn(1, 30)
conf = PostTrainingQuantConfig(
approach="weight_only",
)
q_model = quantization.fit(model, conf)
out1 = q_model(input)
q_model.save("saved")
model_size1 = os.path.getsize("saved/best_model.pt") / 1024
print("FP32 Model size:{:.3f}M".format(model_size1))
# test compress_bits = [8, 16, 32, 64]
compression_dtype = [torch.int8, torch.int16, torch.int32, torch.int64]
for dtype in compression_dtype:
new_model = Model()
inc_model = INCModel(new_model)
compressed_model = inc_model.export_compressed_model(
qweight_config_path="saved/qconfig.json",
compression_dtype=dtype,
scale_dtype=torch.float32,
use_optimum_format=False,
)
out2 = q_model(input)
torch.save(compressed_model.state_dict(), "saved/tmp.pt")
model_size2 = os.path.getsize("saved/tmp.pt") / 1024
print("WeightOnlyLinear Model size:{:.3f}M".format(model_size2))
self.assertTrue(isinstance(compressed_model.fc1, WeightOnlyLinear))
self.assertTrue(compressed_model.fc1.qweight.dtype == dtype)
self.assertTrue(compressed_model.fc1.scales.dtype == torch.float32)
self.assertTrue(model_size1 / model_size2 > 2)
self.assertTrue(torch.all(torch.isclose(out1, out2, atol=5e-1)))
# test compress_bits = [8, 16, 32, 64]
compress_dims = [0, 1]
for dim in compress_dims:
new_model = Model()
inc_model = INCModel(new_model)
compressed_model = inc_model.export_compressed_model(
qweight_config_path="saved/qconfig.json",
compression_dim=dim,
use_optimum_format=False,
)
out2 = q_model(input)
torch.save(compressed_model.state_dict(), "saved/tmp.pt")
model_size2 = os.path.getsize("saved/tmp.pt") / 1024
print("WeightOnlyLinear Model size:{:.3f}M".format(model_size2))
self.assertTrue(isinstance(compressed_model.fc1, WeightOnlyLinear))
if dim == 1:
self.assertTrue(compressed_model.fc1.qweight.shape[1] != compressed_model.fc1.in_features)
else:
self.assertTrue(compressed_model.fc1.qweight.shape[0] != compressed_model.fc1.out_features)
self.assertTrue(model_size1 / model_size2 > 2)
self.assertTrue(torch.all(torch.isclose(out1, out2, atol=5e-1)))
# test half dtype
new_model = Model()
inc_model = INCModel(new_model)
compressed_model = inc_model.export_compressed_model(
qweight_config_path="saved/qconfig.json",
)
out2 = q_model(input)
torch.save(compressed_model.state_dict(), "saved/tmp.pt")
model_size2 = os.path.getsize("saved/tmp.pt") / 1024
print("WeightOnlyLinear Model size:{:.3f}M".format(model_size2))
self.assertTrue(isinstance(compressed_model.fc1, WeightOnlyLinear))
self.assertTrue(compressed_model.fc1.scales.dtype == torch.float16)
self.assertTrue(model_size1 / model_size2 > 2)
self.assertTrue(torch.all(torch.isclose(out1, out2, atol=5e-1)))
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,475 @@
import os
import shutil
import subprocess
import sys
import unittest
import numpy as np
import onnx
from onnx import TensorProto, helper, numpy_helper
from packaging.version import Version
from neural_compressor import PostTrainingQuantConfig, quantization
from neural_compressor.adaptor.pytorch import get_torch_version
from neural_compressor.data import DATALOADERS, Datasets
from neural_compressor.model.onnx_model import ONNXModel
PT_VERSION = get_torch_version().release
def get_onnx_model():
import torch
import torchvision
from torch.autograd import Variable
model = torchvision.models.resnet18()
x = Variable(torch.randn(1, 3, 224, 224))
torch_out = torch.onnx.export(model, x, "resnet18.onnx", export_params=True, verbose=True)
def generate_input_initializer(tensor_shape, tensor_dtype, input_name):
"""Helper function to generate initializers for test inputs."""
tensor = np.random.ranf(tensor_shape).astype(tensor_dtype)
init = numpy_helper.from_array(tensor, input_name)
return init
class TestOnnxModel(unittest.TestCase):
def setUp(self):
# Relu
# | \
# Conv \
# | \
# Relu |
# | Conv
# Conv /
# \ /
# |
# Add
input0 = helper.make_tensor_value_info("input0", TensorProto.FLOAT, [1, 3, 1, 3])
output = helper.make_tensor_value_info("output", TensorProto.FLOAT, [1, 3, 1, 3])
X1_weight = generate_input_initializer([3, 3, 1, 1], np.float32, "X1_weight")
X1_bias = generate_input_initializer([3], np.float32, "X1_bias")
X3_weight = generate_input_initializer([3, 3, 1, 1], np.float32, "X3_weight")
X3_bias = generate_input_initializer([3], np.float32, "X3_bias")
X5_weight = generate_input_initializer([3, 3, 1, 1], np.float32, "X5_weight")
X5_bias = generate_input_initializer([3], np.float32, "X5_bias")
relu_node_1 = onnx.helper.make_node("Relu", ["input0"], ["X1"], name="Relu1")
conv_node_1 = onnx.helper.make_node("Conv", ["X1", "X1_weight", "X1_bias"], ["X2"], name="Conv1")
relu_node_2 = onnx.helper.make_node("Relu", ["X2"], ["X3"], name="Relu2")
conv_node_2 = onnx.helper.make_node("Conv", ["X3", "X3_weight", "X3_bias"], ["X4"], name="Conv2")
conv_node_3 = onnx.helper.make_node("Conv", ["X1", "X5_weight", "X5_bias"], ["X5"], name="Conv3")
add_node = onnx.helper.make_node("Add", ["X4", "X5"], ["output"], name="Add")
graph = helper.make_graph(
[relu_node_1, conv_node_1, relu_node_2, conv_node_2, conv_node_3, add_node],
"test_graph_6",
[input0],
[output],
)
graph.initializer.add().CopyFrom(X1_weight)
graph.initializer.add().CopyFrom(X1_bias)
graph.initializer.add().CopyFrom(X3_weight)
graph.initializer.add().CopyFrom(X3_bias)
graph.initializer.add().CopyFrom(X5_weight)
graph.initializer.add().CopyFrom(X5_bias)
model = helper.make_model(graph)
test_model_path = "./test_model_6.onnx"
onnx.save(model, test_model_path)
model = onnx.load(test_model_path)
self.model = ONNXModel(model)
# QuantizeLinear
# |
# QLinearConv
# |
# DequantizeLinear
A = helper.make_tensor_value_info("A", TensorProto.FLOAT, [1, 1, 5, 5])
A_scale = helper.make_tensor_value_info("A_scale", TensorProto.FLOAT, [1])
a_scale = generate_input_initializer([1], np.float32, "A_scale")
A_zero = helper.make_tensor_value_info("A_zero_point", TensorProto.INT8, [1])
a_zero_point = generate_input_initializer([1], np.int8, "A_zero_point")
B_scale = helper.make_tensor_value_info("B_scale", TensorProto.FLOAT, [1])
b_scale = generate_input_initializer([1], np.float32, "B_scale")
B_zero = helper.make_tensor_value_info("B_zero_point", TensorProto.INT8, [1])
b_zero_point = generate_input_initializer([1], np.int8, "B_zero_point")
C = helper.make_tensor_value_info("C", TensorProto.INT8, [1, 1, 5, 5])
c = generate_input_initializer([1, 1, 5, 5], np.int8, "C")
C_scale = helper.make_tensor_value_info("C_scale", TensorProto.FLOAT, [1])
c_scale = generate_input_initializer([1], np.float32, "C_scale")
C_zero = helper.make_tensor_value_info("C_zero_point", TensorProto.INT8, [1])
c_zero_point = generate_input_initializer([1], np.int8, "C_zero_point")
E = helper.make_tensor_value_info("E", TensorProto.INT32, [1])
e = generate_input_initializer([1], np.int32, "E")
D_scale = helper.make_tensor_value_info("D_scale", TensorProto.FLOAT, [1])
d_scale = generate_input_initializer([1], np.float32, "D_scale")
D_zero = helper.make_tensor_value_info("D_zero_point", TensorProto.INT8, [1])
d_zero_point = generate_input_initializer([1], np.int8, "D_zero_point")
D = helper.make_tensor_value_info("D", TensorProto.FLOAT, [1, 1, 5, 5])
quantize_node = onnx.helper.make_node(
"QuantizeLinear", ["A", "A_scale", "A_zero_point"], ["B_quantized"], name="A_QuantizeLinear"
)
conv_node = onnx.helper.make_node(
"QLinearConv",
[
"B_quantized",
"B_scale",
"B_zero_point",
"C_quantized",
"C_scale",
"C_zero_point",
"D_scale",
"D_zero_point",
"E",
],
["D_quantized"],
name="conv_quant",
kernel_shape=[3, 3],
pads=[1, 1, 1, 1],
)
dequantize_node = onnx.helper.make_node(
"DequantizeLinear", ["D_quantized", "D_scale", "D_zero_point"], ["D"], name="D_DequantizeLinear"
)
graph = helper.make_graph(
[quantize_node, conv_node, dequantize_node],
"test_graph_7",
[A, A_scale, A_zero, C, C_scale, C_zero, E, D_scale, D_zero],
[D],
)
graph.initializer.add().CopyFrom(a_scale)
graph.initializer.add().CopyFrom(a_zero_point)
graph.initializer.add().CopyFrom(b_scale)
graph.initializer.add().CopyFrom(b_zero_point)
graph.initializer.add().CopyFrom(c)
graph.initializer.add().CopyFrom(c_scale)
graph.initializer.add().CopyFrom(c_zero_point)
graph.initializer.add().CopyFrom(e)
graph.initializer.add().CopyFrom(d_scale)
graph.initializer.add().CopyFrom(d_zero_point)
model = helper.make_model(graph)
self.q_model = ONNXModel(model)
# MatMul
# |
# Add
# |
# Reshape
# |
# Reshape
# |
# MatMul
# |
# Add
input = onnx.helper.make_tensor_value_info("input", onnx.TensorProto.FLOAT, [2, 4])
W1 = onnx.helper.make_tensor_value_info("W1", onnx.TensorProto.FLOAT, [4, 5])
w1 = generate_input_initializer([4, 5], np.float32, "W1")
B1 = onnx.helper.make_tensor_value_info("b1", onnx.TensorProto.FLOAT, [5])
b1 = generate_input_initializer([5], np.float32, "b1")
shape = numpy_helper.from_array(np.array((2, 5)).astype(np.int64), name="shape")
W2 = onnx.helper.make_tensor_value_info("W2", onnx.TensorProto.FLOAT, [5, 6])
w2 = generate_input_initializer([5, 6], np.float32, "W2")
B2 = onnx.helper.make_tensor_value_info("b2", onnx.TensorProto.FLOAT, [6])
b2 = generate_input_initializer([6], np.float32, "b2")
output = onnx.helper.make_tensor_value_info("output", onnx.TensorProto.FLOAT, [2, 6])
node1 = onnx.helper.make_node("MatMul", inputs=["input", "W1"], outputs=["y1"])
node2 = onnx.helper.make_node("Add", inputs=["y1", "b1"], outputs=["y1_add_b1"])
node3 = onnx.helper.make_node("Reshape", inputs=["y1_add_b1", "shape"], outputs=["y2"])
node4 = onnx.helper.make_node("Reshape", inputs=["y2", "shape"], outputs=["y3"])
node5 = onnx.helper.make_node("MatMul", inputs=["y3", "W2"], outputs=["y4"])
node6 = onnx.helper.make_node("Add", inputs=["y4", "b2"], outputs=["output"])
graph = onnx.helper.make_graph(
[node1, node2, node3, node4, node5, node6], "test_matmul_reshape_graph", [input, W1, B1, W2, B2], [output]
)
graph.initializer.add().CopyFrom(w1)
graph.initializer.add().CopyFrom(b1)
graph.initializer.add().CopyFrom(w2)
graph.initializer.add().CopyFrom(b2)
graph.initializer.add().CopyFrom(shape)
model = onnx.helper.make_model(graph, **{"opset_imports": [onnx.helper.make_opsetid("", 14)]})
self.matmul_reshape_model = model
cmd = (
"optimum-cli export onnx --model hf-internal-testing/tiny-random-gptj --task text-generation --legacy gptj/"
)
p = subprocess.Popen(
cmd, preexec_fn=os.setsid, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True
) # nosec
p.communicate()
@classmethod
def tearDownClass(self):
shutil.rmtree("./gptj", ignore_errors=True)
shutil.rmtree("./hf_test", ignore_errors=True)
if os.path.exists("model.onnx"):
os.remove("model.onnx")
def test_hf_model(self):
from optimum.onnxruntime import ORTModelForCausalLM
from transformers import AutoConfig, AutoTokenizer
os.mkdir("hf_test")
model = ONNXModel("gptj/decoder_model.onnx")
model.save("./hf_test/decoder_model.onnx")
self.assertTrue(os.path.exists("hf_test/config.json"))
config = AutoConfig.from_pretrained("hf_test")
sessions = ORTModelForCausalLM.load_model("hf_test/decoder_model.onnx")
model = ORTModelForCausalLM(sessions, config, model_save_dir="hf_test", use_cache=False, use_io_binding=False)
self.assertNotEqual(model, None)
def test_nodes(self):
self.assertEqual(len(self.model.nodes()), 6)
nodes_name = [node.name for node in self.model.nodes()]
nodes = ["Relu1", "Conv1", "Relu2", "Conv2", "Conv3", "Add"]
for node in nodes:
self.assertTrue(node in nodes_name)
def test_initializer(self):
self.assertEqual(len(self.model.initializer()), 6)
inits_name = [init.name for init in self.model.initializer()]
inits = ["X1_weight", "X1_bias", "X3_weight", "X3_bias", "X5_weight", "X5_bias"]
for init in inits:
self.assertTrue(init in inits_name)
def test_remove_node(self):
for node in self.model.nodes():
if node.op_type == "Add":
self.model.remove_node(node)
self.assertEqual(len(self.model.nodes()), 5)
nodes_name = [node.name for node in self.model.nodes()]
nodes = ["Relu1", "Conv1", "Relu2", "Conv2", "Conv3"]
for node in nodes:
self.assertTrue(node in nodes_name)
def test_remove_nodes(self):
nodes_to_remove = []
for node in self.model.nodes():
if node.name == "Conv3" or node.name == "Add":
nodes_to_remove.append(node)
self.model.remove_nodes(nodes_to_remove)
self.assertEqual(len(self.model.nodes()), 4)
nodes_name = [node.name for node in self.model.nodes()]
nodes = ["Relu1", "Conv1", "Relu2", "Conv2"]
for node in nodes:
self.assertTrue(node in nodes_name)
def test_add_node(self):
node_to_add = onnx.helper.make_node("Relu", ["output"], ["output1"], keepdims=0)
self.model.add_node(node_to_add)
last_node = self.model.nodes()[-1]
self.assertEqual(last_node.op_type, "Relu")
def test_add_nodes(self):
nodes_to_add = []
for i in range(2):
node_to_add = onnx.helper.make_node(
"Relu", ["add_node{}_input".format(str(i))], ["add_node{}_output".format(str(i))], keepdims=0
)
nodes_to_add.append(node_to_add)
self.model.add_nodes(nodes_to_add)
self.assertEqual(self.model.nodes()[-1].input, ["add_node1_input"])
self.assertEqual(self.model.nodes()[-2].input, ["add_node0_input"])
self.assertEqual(self.model.nodes()[-1].output, ["add_node1_output"])
self.assertEqual(self.model.nodes()[-2].output, ["add_node0_output"])
def test_get_initializer(self):
inits = ["X1_weight", "X1_bias", "X3_weight", "X3_bias", "X5_weight", "X5_bias"]
for init in inits:
self.assertIsNotNone(self.model.get_initializer(init))
def test_remove_initializer(self):
for init in self.model.initializer():
if init.name == "X1_weight":
self.model.remove_initializer(init)
self.assertEqual(len(self.model.initializer()), 5)
inits_name = [init.name for init in self.model.initializer()]
inits = ["X1_bias", "X3_weight", "X3_bias", "X5_weight", "X5_bias"]
for init in inits:
self.assertTrue(init in inits_name)
def test_remove_initializers(self):
init_to_remove = []
for init in self.model.initializer():
if "bias" in init.name:
init_to_remove.append(init)
self.model.remove_initializers(init_to_remove)
self.assertEqual(len(self.model.initializer()), 3)
inits_name = [init.name for init in self.model.initializer()]
inits = ["X1_weight", "X3_weight", "X5_weight"]
for init in inits:
self.assertTrue(init in inits_name)
def test_input_name_to_nodes(self):
self.assertEqual(len(self.model.input_name_to_nodes), 12)
ipts_name = [name for name in self.model.input_name_to_nodes]
ipts = ["input0", "X1", "X2", "X3", "X3_weight", "X3_bias", "X5_weight", "X5_bias", "X4", "X5"]
for ipt in ipts:
self.assertTrue(ipt in ipts_name)
def test_output_name_to_node(self):
self.assertEqual(len(self.model.output_name_to_node), 6)
opts_name = [name for name in self.model.output_name_to_node]
opts = ["X1", "X2", "X3", "X4", "X5", "output"]
for opt in opts:
self.assertTrue(opt in opts_name)
def test_get_siblings(self):
for node in self.model.nodes():
if node.name == "Conv1":
siblings = self.model.get_siblings(node)
self.assertEqual(len(siblings), 1)
siblings_name = [sibling.name for sibling in siblings]
names = ["Conv3"]
for name in names:
self.assertTrue(name in siblings_name)
def test_get_children(self):
for node in self.model.nodes():
if node.name == "Relu1":
children = self.model.get_children(node)
self.assertEqual(len(children), 2)
children_name = [child.name for child in children]
names = ["Conv1", "Conv3"]
for name in names:
self.assertTrue(name in children_name)
def test_get_parents(self):
for node in self.model.nodes():
if node.op_type == "Add":
parents = self.model.get_parents(node)
self.assertEqual(len(parents), 2)
parents_name = [parent.name for parent in parents]
names = ["Conv2", "Conv3"]
for name in names:
self.assertTrue(name in parents_name)
def test_get_parent(self):
for node in self.model.nodes():
if node.op_type == "Add":
node_to_get_parent = node
parent = self.model.get_parent(node, 0)
self.assertEqual(parent.name, "Conv2")
parent = self.model.get_parent(node, 1)
self.assertEqual(parent.name, "Conv3")
parent = self.model.get_parent(node, 2)
self.assertIsNone(parent)
def test_find_nodes_by_initializer(self):
for init in self.model.initializer():
if init.name == "X1_weight":
initializer = init
nodes = self.model.find_nodes_by_initializer(self.model.graph(), initializer)
self.assertEqual(len(nodes), 1)
self.assertEqual(nodes[0].name, "Conv1")
def test_get_scale_zero(self):
import time
result = [0.1]
def sub_eval(model, result):
time.sleep(0.001 * len(result))
return result[0]
def eval(model):
return sub_eval(model, result)
dataset = Datasets("onnxrt_qdq")["dummy"]((4, 4), low=0.0, high=0.0, dtype="float32")
dataloader = DATALOADERS["onnxrt_qdq"](dataset, 2)
config = PostTrainingQuantConfig()
q_model = quantization.fit(self.matmul_reshape_model, config, calib_dataloader=dataloader, eval_func=eval)
q_model.save("test.onnx")
scale, zp = q_model.get_scale_zero("y3_QuantizeInput_quantized")
self.assertEqual(scale.name, "y1_add_b1_scale")
self.assertEqual(zp.name, "y1_add_b1_zero_point")
scale, zp = q_model.get_scale_zero("input_quantized")
self.assertEqual(scale.name, "input_scale")
self.assertEqual(zp.name, "input_zero_point")
def test_save(self):
self.model.save_model_to_file("./test_model_6.onnx", use_external_data_format=True)
def test_find_by_name(self):
from neural_compressor.adaptor.ox_utils.util import dtype_mapping, dtype_to_name, find_by_name
initializer = find_by_name("X1_weight", self.model.initializer())
self.assertIsNotNone(initializer)
initializer = find_by_name("X1", self.model.initializer())
self.assertIsNone(initializer)
def test_remove_unused_nodes(self):
self.assertEqual(len(self.model.nodes()), 6)
node_to_add = onnx.helper.make_node("Relu", ["output1"], ["output2"], keepdims=0, name="added_relu")
self.model.add_node(node_to_add)
self.assertEqual(len(self.model.nodes()), 7)
self.model.remove_unused_nodes()
self.assertEqual(len(self.model.nodes()), 6)
# TODO: follow https://github.com/onnx/neural-compressor/pull/40
@unittest.skipIf(PT_VERSION >= Version("2.5.0").release, "Please use Pytorch version lower 2.5.")
def test_check_large_model(self):
import onnx
import torch
import torch.nn as nn
from neural_compressor.model.onnx_model import ONNXModel
class Net(nn.Module):
def __init__(self, in_features, out_features):
super(Net, self).__init__()
self.fc = nn.Linear(in_features, out_features)
def forward(self, x):
x = self.fc(x)
return x
# model > 2GB
model = Net(512, 1024 * 1024)
input = torch.randn(512, requires_grad=True)
with torch.no_grad():
torch.onnx.export(model, (input,), "model.onnx", do_constant_folding=True, opset_version=13)
model = onnx.load("model.onnx")
model = ONNXModel(model) # pass ModelProto
model.check_is_large_model()
self.assertTrue(model.is_large_model)
model = ONNXModel("model.onnx") # pass string
model.check_is_large_model()
self.assertTrue(model.is_large_model)
model = onnx.load("model.onnx", load_external_data=False) # not load init
model = ONNXModel(model)
model.check_is_large_model()
self.assertTrue(model.is_large_model)
# model < 2GB
model = Net(10, 10 * 10)
input = torch.randn(10, requires_grad=True)
with torch.no_grad():
torch.onnx.export(model, (input,), "model.onnx", do_constant_folding=True, opset_version=13)
model = onnx.load("model.onnx")
model = ONNXModel(model) # pass ModelProto
model.check_is_large_model()
self.assertFalse(model.is_large_model)
model = ONNXModel("model.onnx") # pass string
model.check_is_large_model()
self.assertFalse(model.is_large_model)
model = ONNXModel("model.onnx", load_external_data_for_model=False) # not load init
model.check_is_large_model()
self.assertFalse(model.is_large_model)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,52 @@
#
# -*- coding: utf-8 -*-
#
import os
import platform
import unittest
from neural_compressor.adaptor.tensorflow import TensorFlowAdaptor
from neural_compressor.model import Model as TensorflowModel
from neural_compressor.model.tensorflow_model import validate_graph_node
class TestTFAutoDetectInputOutput(unittest.TestCase):
mb_model_url = (
"https://storage.googleapis.com/intel-optimized-tensorflow/models/v1_6/mobilenet_v1_1.0_224_frozen.pb"
)
pb_path = "/tmp/.neural_compressor/mobilenet_fp32.pb"
platform = platform.system().lower()
if platform == "windows":
pb_path = "C:\\tmp\\.neural_compressor\\mobilenet_fp32.pb"
@classmethod
def setUpClass(self):
self.saved_flag = True
if not os.path.exists(self.pb_path):
try:
if self.platform == "linux":
os.system(
"mkdir -p /tmp/.neural_compressor && wget {} -O {} ".format(self.mb_model_url, self.pb_path)
)
elif self.platform == "windows":
os.system("md C:\\tmp\.neural_compressor && cd C:\\tmp\.neural_compressor")
from urllib import request
request.urlretrieve(self.mb_model_url)
except Exception as e:
self.saved_flag = False
def testAutoDetectInputOutput(self):
if self.saved_flag:
model = TensorflowModel(self.pb_path)
outputs = model.output_node_names
inputs = model.input_node_names
output_validate = validate_graph_node(model.graph_def, outputs)
self.assertTrue(output_validate)
input_validate = validate_graph_node(model.graph_def, inputs)
self.assertTrue(input_validate)
if __name__ == "__main__":
unittest.main()