|
- # Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
-
- import base64
- import json
-
- import numpy as np
-
- from paddlenlp.utils.ie_utils import map_offset, pad_image_data
- from paddlenlp.utils.log import logger
-
-
- def reader(data_path, max_seq_len=512):
- """
- read json
- """
- with open(data_path, "r", encoding="utf-8") as f:
- for line in f:
- json_line = json.loads(line)
- content = json_line["content"].strip()
- prompt = json_line["prompt"]
- boxes = json_line.get("bbox", None)
- image = json_line.get("image", None)
- # Model Input is aslike: [CLS] prompt [SEP] [SEP] text [SEP] for UIE-X
- if boxes is not None and image is not None:
- summary_token_num = 4
- else:
- summary_token_num = 3
- if max_seq_len <= len(prompt) + summary_token_num:
- raise ValueError("The value of max_seq_len is too small, please set a larger value")
- max_content_len = max_seq_len - len(prompt) - summary_token_num
- if len(content) <= max_content_len:
- yield json_line
- else:
- result_list = json_line["result_list"]
- json_lines = []
- accumulate = 0
- while True:
- cur_result_list = []
- for result in result_list:
- if result["end"] - result["start"] > max_content_len:
- logger.warning(
- "result['end'] - result ['start'] exceeds max_content_len, which will result in no valid instance being returned"
- )
- if (
- result["start"] + 1 <= max_content_len < result["end"]
- and result["end"] - result["start"] <= max_content_len
- ):
- max_content_len = result["start"]
- break
-
- cur_content = content[:max_content_len]
- res_content = content[max_content_len:]
- if boxes is not None and image is not None:
- cur_boxes = boxes[:max_content_len]
- res_boxes = boxes[max_content_len:]
-
- while True:
- if len(result_list) == 0:
- break
- elif result_list[0]["end"] <= max_content_len:
- if result_list[0]["end"] > 0:
- cur_result = result_list.pop(0)
- cur_result_list.append(cur_result)
- else:
- cur_result_list = [result for result in result_list]
- break
- else:
- break
-
- if boxes is not None and image is not None:
- json_line = {
- "content": cur_content,
- "result_list": cur_result_list,
- "prompt": prompt,
- "bbox": cur_boxes,
- "image": image,
- }
- else:
- json_line = {
- "content": cur_content,
- "result_list": cur_result_list,
- "prompt": prompt,
- }
- json_lines.append(json_line)
-
- for result in result_list:
- if result["end"] <= 0:
- break
- result["start"] -= max_content_len
- result["end"] -= max_content_len
- accumulate += max_content_len
- max_content_len = max_seq_len - len(prompt) - summary_token_num
- if len(res_content) == 0:
- break
- elif len(res_content) < max_content_len:
- if boxes is not None and image is not None:
- json_line = {
- "content": res_content,
- "result_list": result_list,
- "prompt": prompt,
- "bbox": res_boxes,
- "image": image,
- }
- else:
- json_line = {"content": res_content, "result_list": result_list, "prompt": prompt}
-
- json_lines.append(json_line)
- break
- else:
- content = res_content
- boxes = res_boxes
-
- for json_line in json_lines:
- yield json_line
-
-
- def convert_example(example, tokenizer, max_seq_len, pad_id=1, c_sep_id=2, summary_token_num=4):
-
- content = example["content"]
- prompt = example["prompt"]
- bbox_lines = example.get("bbox", None)
- image_buff_string = example.get("image", None)
- # Text
- if bbox_lines is None or image_buff_string is None:
- encoded_inputs = tokenizer(
- text=[example["prompt"]],
- text_pair=[example["content"]],
- truncation=True,
- max_seq_len=max_seq_len,
- pad_to_max_seq_len=True,
- return_attention_mask=True,
- return_position_ids=True,
- return_offsets_mapping=True,
- return_dict=False,
- )
-
- encoded_inputs = encoded_inputs[0]
-
- inputs_ids = encoded_inputs["input_ids"]
- position_ids = encoded_inputs["position_ids"]
- attention_mask = encoded_inputs["attention_mask"]
-
- q_sep_index = inputs_ids.index(2, 1)
- c_sep_index = attention_mask.index(0)
-
- offset_mapping = [list(x) for x in encoded_inputs["offset_mapping"]]
-
- bias = 0
- for index in range(len(offset_mapping)):
- if index == 0:
- continue
- mapping = offset_mapping[index]
- if mapping[0] == 0 and mapping[1] == 0 and bias == 0:
- # bias = index
- bias = offset_mapping[index - 1][-1] + 1
-
- if mapping[0] == 0 and mapping[1] == 0:
- continue
- offset_mapping[index][0] += bias
- offset_mapping[index][1] += bias
-
- offset_bias = bias
-
- bbox_list = [[0, 0, 0, 0] for x in range(len(inputs_ids))]
- token_type_ids = [
- 1 if token_index <= q_sep_index or token_index > c_sep_index else 0 for token_index in range(max_seq_len)
- ]
- padded_image = np.zeros([3, 224, 224])
-
- # Doc
- else:
- inputs_ids = []
- prev_bbox = [-1, -1, -1, -1]
- this_text_line = ""
- q_sep_index = -1
- offset_mapping = []
- last_offset = 0
- for char_index, (char, bbox) in enumerate(zip(content, bbox_lines)):
- if char_index == 0:
- prev_bbox = bbox
- this_text_line = char
- continue
-
- if all([bbox[x] == prev_bbox[x] for x in range(4)]):
- this_text_line += char
- else:
- offset_mapping, last_offset, q_sep_index, inputs_ids = _encode_doc(
- tokenizer,
- offset_mapping,
- last_offset,
- prompt,
- this_text_line,
- inputs_ids,
- q_sep_index,
- max_seq_len,
- )
- this_text_line = char
- prev_bbox = bbox
-
- if len(this_text_line) > 0:
- offset_mapping, last_offset, q_sep_index, inputs_ids = _encode_doc(
- tokenizer, offset_mapping, last_offset, prompt, this_text_line, inputs_ids, q_sep_index, max_seq_len
- )
-
- if len(inputs_ids) > max_seq_len:
- inputs_ids = inputs_ids[: (max_seq_len - 1)] + [c_sep_id]
- offset_mapping = offset_mapping[: (max_seq_len - 1)] + [[0, 0]]
- else:
- inputs_ids += [c_sep_id]
- offset_mapping += [[0, 0]]
-
- offset_bias = offset_mapping[q_sep_index - 1][-1] + 1
-
- seq_len = len(inputs_ids)
- inputs_ids += [pad_id] * (max_seq_len - seq_len)
- token_type_ids = [1] * (q_sep_index + 1) + [0] * (seq_len - q_sep_index - 1)
- token_type_ids += [pad_id] * (max_seq_len - seq_len)
-
- bbox_list = _process_bbox(inputs_ids, bbox_lines, offset_mapping, offset_bias)
-
- offset_mapping += [[0, 0]] * (max_seq_len - seq_len)
-
- position_ids = list(range(seq_len))
-
- position_ids = position_ids + [0] * (max_seq_len - seq_len)
- attention_mask = [1] * seq_len + [0] * (max_seq_len - seq_len)
-
- image_data = base64.b64decode(image_buff_string.encode("utf8"))
- padded_image = pad_image_data(image_data)
-
- start_ids = np.array([0.0 for x in range(max_seq_len)], dtype="int64")
- end_ids = np.array([0.0 for x in range(max_seq_len)], dtype="int64")
-
- for item in example["result_list"]:
- start = map_offset(item["start"] + offset_bias, offset_mapping)
- end = map_offset(item["end"] - 1 + offset_bias, offset_mapping)
- start_ids[start] = 1.0
- end_ids[end] = 1.0
-
- assert len(inputs_ids) == max_seq_len
- assert len(token_type_ids) == max_seq_len
- assert len(position_ids) == max_seq_len
- assert len(attention_mask) == max_seq_len
- assert len(bbox_list) == max_seq_len
- tokenized_output = {
- "input_ids": inputs_ids,
- "token_type_ids": token_type_ids,
- "position_ids": position_ids,
- "attention_mask": attention_mask,
- "bbox": bbox_list,
- "image": padded_image,
- "start_positions": start_ids,
- "end_positions": end_ids,
- }
- return tokenized_output
-
-
- def _process_bbox(tokens, bbox_lines, offset_mapping, offset_bias):
- bbox_list = [[0, 0, 0, 0] for x in range(len(tokens))]
-
- for index, bbox in enumerate(bbox_lines):
- index_token = map_offset(index + offset_bias, offset_mapping)
- if 0 <= index_token < len(bbox_list):
- bbox_list[index_token] = bbox
- return bbox_list
-
-
- def _encode_doc(tokenizer, offset_mapping, last_offset, prompt, this_text_line, inputs_ids, q_sep_index, max_seq_len):
- if len(offset_mapping) == 0:
- content_encoded_inputs = tokenizer(
- text=[prompt],
- text_pair=[this_text_line],
- max_seq_len=max_seq_len,
- return_dict=False,
- return_offsets_mapping=True,
- )
- content_encoded_inputs = content_encoded_inputs[0]
- inputs_ids = content_encoded_inputs["input_ids"][:-1]
- sub_offset_mapping = [list(x) for x in content_encoded_inputs["offset_mapping"]]
- q_sep_index = content_encoded_inputs["input_ids"].index(2, 1)
-
- bias = 0
- for i in range(len(sub_offset_mapping)):
- if i == 0:
- continue
- mapping = sub_offset_mapping[i]
- if mapping[0] == 0 and mapping[1] == 0 and bias == 0:
- bias = sub_offset_mapping[i - 1][-1] + 1
- if mapping[0] == 0 and mapping[1] == 0:
- continue
- if mapping == sub_offset_mapping[i - 1]:
- continue
- sub_offset_mapping[i][0] += bias
- sub_offset_mapping[i][1] += bias
-
- offset_mapping = sub_offset_mapping[:-1]
- last_offset = offset_mapping[-1][-1]
- else:
- content_encoded_inputs = tokenizer(
- text=this_text_line, max_seq_len=max_seq_len, return_dict=False, return_offsets_mapping=True
- )
- inputs_ids += content_encoded_inputs["input_ids"][1:-1]
- sub_offset_mapping = [list(x) for x in content_encoded_inputs["offset_mapping"]]
-
- for i, sub_list in enumerate(sub_offset_mapping[1:-1]):
- if i == 0:
- org_offset = sub_list[1]
- else:
- if sub_list[0] != org_offset and sub_offset_mapping[1:-1][i - 1] != sub_list:
- last_offset += 1
- org_offset = sub_list[1]
- offset_mapping += [[last_offset, sub_list[1] - sub_list[0] + last_offset]]
- last_offset = offset_mapping[-1][-1]
- return offset_mapping, last_offset, q_sep_index, inputs_ids
|