Yes, you can use the SentenceSplitter
in your Llama pipeline to deal with paragraphs and lists of items without cutting them in the middle. The SentenceSplitter
is designed to parse text with a preference for complete sentences and tries to keep sentences and paragraphs together. It has customizable chunk size, chunk overlap, separator, paragraph separator, and secondary chunking regex, which allows it to handle text in a way that respects the boundaries of sentences, paragraphs, and list items.
Here's an example of how you can initialize and use the SentenceSplitter
:
from llama_index.core.node_parser import SentenceSplitter splitter = SentenceSplitter( chunk_size=1024, # Adjust the chunk size according to your needs chunk_overlap=20, # Adjust the overlap size according to your needs paragraph_separator="\n\n", # Assuming paragraphs are separated by two newlines # You can adjust the secondary_chunking_regex to better handle list items if needed ) # Assuming `documents` is a list of text documents you want to split nodes = splitter.get_nodes_from_documents(documents)
The SentenceSplitter
will attempt to split the text while respecting the boundaries of sentences, paragraphs, and potentially list items, depending on how you configure the paragraph_separator
and secondary_chunking_regex
. This makes it suitable for use cases where maintaining the integrity of paragraphs and list items is important.
For more details on the SentenceSplitter
, you can refer to its implementation and documentation in the SentenceSplitter code.
class SentenceSplitter(MetadataAwareTextSplitter): """Parse text with a preference for complete sentences. In general, this class tries to keep sentences and paragraphs together. Therefore compared to the original TokenTextSplitter, there are less likely to be hanging sentences or parts of sentences at the end of the node chunk. """ chunk_size: int = Field( default=DEFAULT_CHUNK_SIZE, description="The token chunk size for each chunk.", gt=0, ) chunk_overlap: int = Field( default=SENTENCE_CHUNK_OVERLAP, description="The token overlap of each chunk when splitting.", gte=0, ) separator: str = Field( default=" ", description="Default separator for splitting into words" ) paragraph_separator: str = Field( default=DEFAULT_PARAGRAPH_SEP, description="Separator between paragraphs." ) secondary_chunking_regex: str = Field( default=CHUNKING_REGEX, description="Backup regex for splitting into sentences." ) _chunking_tokenizer_fn: Callable[[str], List[str]] = PrivateAttr() _tokenizer: Callable = PrivateAttr() _split_fns: List[Callable] = PrivateAttr() _sub_sentence_split_fns: List[Callable] = PrivateAttr() def __init__( self, separator: str = " ", chunk_size: int = DEFAULT_CHUNK_SIZE, chunk_overlap: int = SENTENCE_CHUNK_OVERLAP, tokenizer: Optional[Callable] = None, paragraph_separator: str = DEFAULT_PARAGRAPH_SEP, chunking_tokenizer_fn: Optional[Callable[[str], List[str]]] = None, secondary_chunking_regex: str = CHUNKING_REGEX, callback_manager: Optional[CallbackManager] = None, include_metadata: bool = True, include_prev_next_rel: bool = True, id_func: Optional[Callable[[int, Document], str]] = None, ): """Initialize with parameters.""" if chunk_overlap > chunk_size: raise ValueError( f"Got a larger chunk overlap ({chunk_overlap}) than chunk size " f"({chunk_size}), should be smaller." ) id_func = id_func or default_id_func callback_manager = callback_manager or CallbackManager([]) self._chunking_tokenizer_fn = ( chunking_tokenizer_fn or split_by_sentence_tokenizer() ) self._tokenizer = tokenizer or get_tokenizer() self._split_fns = [ split_by_sep(paragraph_separator), self._chunking_tokenizer_fn, ] self._sub_sentence_split_fns = [ split_by_regex(secondary_chunking_regex), split_by_sep(separator), split_by_char(), ] super().__init__( chunk_size=chunk_size, chunk_overlap=chunk_overlap, secondary_chunking_regex=secondary_chunking_regex, separator=separator, paragraph_separator=paragraph_separator, callback_manager=callback_manager, include_metadata=include_metadata, include_prev_next_rel=include_prev_next_rel, id_func=id_func, ) @classmethod def from_defaults( cls, separator: str = " ", chunk_size: int = DEFAULT_CHUNK_SIZE, chunk_overlap: int = SENTENCE_CHUNK_OVERLAP, tokenizer: Optional[Callable] = None, paragraph_separator: str = DEFAULT_PARAGRAPH_SEP, chunking_tokenizer_fn: Optional[Callable[[str], List[str]]] = None, secondary_chunking_regex: str = CHUNKING_REGEX, callback_manager: Optional[CallbackManager] = None, include_metadata: bool = True, include_prev_next_rel: bool = True, ) -> "SentenceSplitter": """Initialize with parameters.""" callback_manager = callback_manager or CallbackManager([]) return cls( separator=separator, chunk_size=chunk_size, chunk_overlap=chunk_overlap, tokenizer=tokenizer, paragraph_separator=paragraph_separator, chunking_tokenizer_fn=chunking_tokenizer_fn, secondary_chunking_regex=secondary_chunking_regex, callback_manager=callback_manager, include_metadata=include_metadata, include_prev_next_rel=include_prev_next_rel, ) @classmethod def class_name(cls) -> str: return "SentenceSplitter" def split_text_metadata_aware(self, text: str, metadata_str: str) -> List[str]: metadata_len = len(self._tokenizer(metadata_str)) effective_chunk_size = self.chunk_size - metadata_len if effective_chunk_size <= 0: raise ValueError( f"Metadata length ({metadata_len}) is longer than chunk size " f"({self.chunk_size}). Consider increasing the chunk size or " "decreasing the size of your metadata to avoid this." ) elif effective_chunk_size < 50: print( f"Metadata length ({metadata_len}) is close to chunk size " f"({self.chunk_size}). Resulting chunks are less than 50 tokens. " "Consider increasing the chunk size or decreasing the size of " "your metadata to avoid this.", flush=True, ) return self._split_text(text, chunk_size=effective_chunk_size) def split_text(self, text: str) -> List[str]: return self._split_text(text, chunk_size=self.chunk_size) def _split_text(self, text: str, chunk_size: int) -> List[str]: """ _Split incoming text and return chunks with overlap size. Has a preference for complete sentences, phrases, and minimal overlap. """ if text == "": return [text] with self.callback_manager.event( CBEventType.CHUNKING, payload={EventPayload.CHUNKS: [text]} ) as event: splits = self._split(text, chunk_size) chunks = self._merge(splits, chunk_size) event.on_end(payload={EventPayload.CHUNKS: chunks}) return chunks def _split(self, text: str, chunk_size: int) -> List[_Split]: r"""Break text into splits that are smaller than chunk size. The order of splitting is: 1. split by paragraph separator 2. split by chunking tokenizer (default is nltk sentence tokenizer) 3. split by second chunking regex (default is "[^,\.;]+[,\.;]?") 4. split by default separator (" ") """ token_size = self._token_size(text) if self._token_size(text) <= chunk_size: return [_Split(text, is_sentence=True, token_size=token_size)] text_splits_by_fns, is_sentence = self._get_splits_by_fns(text) text_splits = [] for text_split_by_fns in text_splits_by_fns: token_size = self._token_size(text_split_by_fns) if token_size <= chunk_size: text_splits.append( _Split( text_split_by_fns, is_sentence=is_sentence, token_size=token_size, ) ) else: recursive_text_splits = self._split( text_split_by_fns, chunk_size=chunk_size ) text_splits.extend(recursive_text_splits) return text_splits def _merge(self, splits: List[_Split], chunk_size: int) -> List[str]: """Merge splits into chunks.""" chunks: List[str] = [] cur_chunk: List[Tuple[str, int]] = [] # list of (text, length) last_chunk: List[Tuple[str, int]] = [] cur_chunk_len = 0 new_chunk = True def close_chunk() -> None: nonlocal chunks, cur_chunk, last_chunk, cur_chunk_len, new_chunk chunks.append("".join([text for text, length in cur_chunk])) last_chunk = cur_chunk cur_chunk = [] cur_chunk_len = 0 new_chunk = True # add overlap to the next chunk using the last one first # there is a small issue with this logic. If the chunk directly after # the overlap is really big, then we could go over the chunk_size, and # in theory the correct thing to do would be to remove some/all of the # overlap. However, it would complicate the logic further without # much real world benefit, so it's not implemented now. if len(last_chunk) > 0: last_index = len(last_chunk) - 1 while ( last_index >= 0 and cur_chunk_len + last_chunk[last_index][1] <= self.chunk_overlap ): text, length = last_chunk[last_index] cur_chunk_len += length cur_chunk.insert(0, (text, length)) last_index -= 1 while len(splits) > 0: cur_split = splits[0] if cur_split.token_size > chunk_size: raise ValueError("Single token exceeded chunk size") if cur_chunk_len + cur_split.token_size > chunk_size and not new_chunk: # if adding split to current chunk exceeds chunk size: close out chunk close_chunk() else: if ( cur_split.is_sentence or cur_chunk_len + cur_split.token_size <= chunk_size or new_chunk # new chunk, always add at least one split ): # add split to chunk cur_chunk_len += cur_split.token_size cur_chunk.append((cur_split.text, cur_split.token_size)) splits.pop(0) new_chunk = False else: # close out chunk close_chunk() # handle the last chunk if not new_chunk: chunk = "".join([text for text, length in cur_chunk]) chunks.append(chunk) # run postprocessing to remove blank spaces return self._postprocess_chunks(chunks) def _postprocess_chunks(self, chunks: List[str]) -> List[str]: """Post-process chunks. Remove whitespace only chunks and remove leading and trailing whitespace. """ new_chunks = [] for chunk in chunks: stripped_chunk = chunk.strip() if stripped_chunk == "": continue new_chunks.append(stripped_chunk) return new_chunks def _token_size(self, text: str) -> int: return len(self._tokenizer(text)) def _get_splits_by_fns(self, text: str) -> Tuple[List[str], bool]: for split_fn in self._split_fns: splits = split_fn(text) if len(splits) > 1: return splits, True break for split_fn in self._sub_sentence_split_fns: splits = split_fn(text) if len(splits) > 1: break return splits, False
class SentenceSplitter(MetadataAwareTextSplitter): """Parse text with a preference for complete sentences. In general, this class tries to keep sentences and paragraphs together. Therefore compared to the original TokenTextSplitter, there are less likely to be hanging sentences or parts of sentences at the end of the node chunk. """ chunk_size: int = Field( default=DEFAULT_CHUNK_SIZE, description="The token chunk size for each chunk.", gt=0, ) chunk_overlap: int = Field( default=SENTENCE_CHUNK_OVERLAP, description="The token overlap of each chunk when splitting.", gte=0, ) separator: str = Field( default=" ", description="Default separator for splitting into words" ) paragraph_separator: str = Field( default=DEFAULT_PARAGRAPH_SEP, description="Separator between paragraphs." ) secondary_chunking_regex: str = Field( default=CHUNKING_REGEX, description="Backup regex for splitting into sentences." ) _chunking_tokenizer_fn: Callable[[str], List[str]] = PrivateAttr() _tokenizer: Callable = PrivateAttr() _split_fns: List[Callable] = PrivateAttr() _sub_sentence_split_fns: List[Callable] = PrivateAttr() def __init__( self, separator: str = " ", chunk_size: int = DEFAULT_CHUNK_SIZE, chunk_overlap: int = SENTENCE_CHUNK_OVERLAP, tokenizer: Optional[Callable] = None, paragraph_separator: str = DEFAULT_PARAGRAPH_SEP, chunking_tokenizer_fn: Optional[Callable[[str], List[str]]] = None, secondary_chunking_regex: str = CHUNKING_REGEX, callback_manager: Optional[CallbackManager] = None, include_metadata: bool = True, include_prev_next_rel: bool = True, id_func: Optional[Callable[[int, Document], str]] = None, ): """Initialize with parameters.""" if chunk_overlap > chunk_size: raise ValueError( f"Got a larger chunk overlap ({chunk_overlap}) than chunk size " f"({chunk_size}), should be smaller." ) id_func = id_func or default_id_func callback_manager = callback_manager or CallbackManager([]) self._chunking_tokenizer_fn = ( chunking_tokenizer_fn or split_by_sentence_tokenizer() ) self._tokenizer = tokenizer or get_tokenizer() self._split_fns = [ split_by_sep(paragraph_separator), self._chunking_tokenizer_fn, ] self._sub_sentence_split_fns = [ split_by_regex(secondary_chunking_regex), split_by_sep(separator), split_by_char(), ] super().__init__( chunk_size=chunk_size, chunk_overlap=chunk_overlap, secondary_chunking_regex=secondary_chunking_regex, separator=separator, paragraph_separator=paragraph_separator, callback_manager=callback_manager, include_metadata=include_metadata, include_prev_next_rel=include_prev_next_rel, id_func=id_func, ) @classmethod def from_defaults( cls, separator: str = " ", chunk_size: int = DEFAULT_CHUNK_SIZE, chunk_overlap: int = SENTENCE_CHUNK_OVERLAP, tokenizer: Optional[Callable] = None, paragraph_separator: str = DEFAULT_PARAGRAPH_SEP, chunking_tokenizer_fn: Optional[Callable[[str], List[str]]] = None, secondary_chunking_regex: str = CHUNKING_REGEX, callback_manager: Optional[CallbackManager] = None, include_metadata: bool = True, include_prev_next_rel: bool = True, ) -> "SentenceSplitter": """Initialize with parameters.""" callback_manager = callback_manager or CallbackManager([]) return cls( separator=separator, chunk_size=chunk_size, chunk_overlap=chunk_overlap, tokenizer=tokenizer, paragraph_separator=paragraph_separator, chunking_tokenizer_fn=chunking_tokenizer_fn, secondary_chunking_regex=secondary_chunking_regex, callback_manager=callback_manager, include_metadata=include_metadata, include_prev_next_rel=include_prev_next_rel, ) @classmethod def class_name(cls) -> str: return "SentenceSplitter" def split_text_metadata_aware(self, text: str, metadata_str: str) -> List[str]: metadata_len = len(self._tokenizer(metadata_str)) effective_chunk_size = self.chunk_size - metadata_len if effective_chunk_size <= 0: raise ValueError( f"Metadata length ({metadata_len}) is longer than chunk size " f"({self.chunk_size}). Consider increasing the chunk size or " "decreasing the size of your metadata to avoid this." ) elif effective_chunk_size < 50: print( f"Metadata length ({metadata_len}) is close to chunk size " f"({self.chunk_size}). Resulting chunks are less than 50 tokens. " "Consider increasing the chunk size or decreasing the size of " "your metadata to avoid this.", flush=True, ) return self._split_text(text, chunk_size=effective_chunk_size) def split_text(self, text: str) -> List[str]: return self._split_text(text, chunk_size=self.chunk_size) def _split_text(self, text: str, chunk_size: int) -> List[str]: """ _Split incoming text and return chunks with overlap size. Has a preference for complete sentences, phrases, and minimal overlap. """ if text == "": return [text] with self.callback_manager.event( CBEventType.CHUNKING, payload={EventPayload.CHUNKS: [text]} ) as event: splits = self._split(text, chunk_size) chunks = self._merge(splits, chunk_size) event.on_end(payload={EventPayload.CHUNKS: chunks}) return chunks def _split(self, text: str, chunk_size: int) -> List[_Split]: r"""Break text into splits that are smaller than chunk size. The order of splitting is: 1. split by paragraph separator 2. split by chunking tokenizer (default is nltk sentence tokenizer) 3. split by second chunking regex (default is "[^,\.;]+[,\.;]?") 4. split by default separator (" ") """ token_size = self._token_size(text) if self._token_size(text) <= chunk_size: return [_Split(text, is_sentence=True, token_size=token_size)] text_splits_by_fns, is_sentence = self._get_splits_by_fns(text) text_splits = [] for text_split_by_fns in text_splits_by_fns: token_size = self._token_size(text_split_by_fns) if token_size <= chunk_size: text_splits.append( _Split( text_split_by_fns, is_sentence=is_sentence, token_size=token_size, ) ) else: recursive_text_splits = self._split( text_split_by_fns, chunk_size=chunk_size ) text_splits.extend(recursive_text_splits) return text_splits def _merge(self, splits: List[_Split], chunk_size: int) -> List[str]: """Merge splits into chunks.""" chunks: List[str] = [] cur_chunk: List[Tuple[str, int]] = [] # list of (text, length) last_chunk: List[Tuple[str, int]] = [] cur_chunk_len = 0 new_chunk = True def close_chunk() -> None: nonlocal chunks, cur_chunk, last_chunk, cur_chunk_len, new_chunk chunks.append("".join([text for text, length in cur_chunk])) last_chunk = cur_chunk cur_chunk = [] cur_chunk_len = 0 new_chunk = True # add overlap to the next chunk using the last one first # there is a small issue with this logic. If the chunk directly after # the overlap is really big, then we could go over the chunk_size, and # in theory the correct thing to do would be to remove some/all of the # overlap. However, it would complicate the logic further without # much real world benefit, so it's not implemented now. if len(last_chunk) > 0: last_index = len(last_chunk) - 1 while ( last_index >= 0 and cur_chunk_len + last_chunk[last_index][1] <= self.chunk_overlap ): text, length = last_chunk[last_index] cur_chunk_len += length cur_chunk.insert(0, (text, length)) last_index -= 1 while len(splits) > 0: cur_split = splits[0] if cur_split.token_size > chunk_size: raise ValueError("Single token exceeded chunk size") if cur_chunk_len + cur_split.token_size > chunk_size and not new_chunk: # if adding split to current chunk exceeds chunk size: close out chunk close_chunk() else: if ( cur_split.is_sentence or cur_chunk_len + cur_split.token_size <= chunk_size or new_chunk # new chunk, always add at least one split ): # add split to chunk cur_chunk_len += cur_split.token_size cur_chunk.append((cur_split.text, cur_split.token_size)) splits.pop(0) new_chunk = False else: # close out chunk close_chunk() # handle the last chunk if not new_chunk: chunk = "".join([text for text, length in cur_chunk]) chunks.append(chunk) # run postprocessing to remove blank spaces return self._postprocess_chunks(chunks) def _postprocess_chunks(self, chunks: List[str]) -> List[str]: """Post-process chunks. Remove whitespace only chunks and remove leading and trailing whitespace. """ new_chunks = [] for chunk in chunks: stripped_chunk = chunk.strip() if stripped_chunk == "": continue new_chunks.append(stripped_chunk) return new_chunks def _token_size(self, text: str) -> int: return len(self._tokenizer(text)) def _get_splits_by_fns(self, text: str) -> Tuple[List[str], bool]: for split_fn in self._split_fns: splits = split_fn(text) if len(splits) > 1: return splits, True break for split_fn in self._sub_sentence_split_fns: splits = split_fn(text) if len(splits) > 1: break return splits, False
def _split(self, text: str, chunk_size: int) -> List[_Split]: r"""Break text into splits that are smaller than chunk size. The order of splitting is: 1. split by paragraph separator 2. split by chunking tokenizer (default is nltk sentence tokenizer) 3. split by second chunking regex (default is "[^,\.;]+[,\.;]?") 4. split by default separator (" ") """ token_size = self._token_size(text) if self._token_size(text) <= chunk_size: return [_Split(text, is_sentence=True, token_size=token_size)] text_splits_by_fns, is_sentence = self._get_splits_by_fns(text) text_splits = [] for text_split_by_fns in text_splits_by_fns: token_size = self._token_size(text_split_by_fns) if token_size <= chunk_size: text_splits.append( _Split( text_split_by_fns, is_sentence=is_sentence, token_size=token_size, ) ) else: recursive_text_splits = self._split( text_split_by_fns, chunk_size=chunk_size ) text_splits.extend(recursive_text_splits) return text_splits
Splits raw code-text based on the language it is written in.
Check the full list of supported languages here.
from llama_index.core.node_parser import CodeSplitter splitter = CodeSplitter( language="python", chunk_lines=40, # lines per chunk chunk_lines_overlap=15, # lines overlap between chunks max_chars=1500, # max chars per chunk ) nodes = splitter.get_nodes_from_documents(documents)
You can also wrap any existing text splitter from langchain with a node parser.
from langchain.text_splitter import RecursiveCharacterTextSplitter from llama_index.core.node_parser import LangchainNodeParser parser = LangchainNodeParser(RecursiveCharacterTextSplitter()) nodes = parser.get_nodes_from_documents(documents)
The SentenceSplitter
attempts to split text while respecting the boundaries of sentences.
from llama_index.core.node_parser import SentenceSplitter splitter = SentenceSplitter( chunk_size=1024, chunk_overlap=20, ) nodes = splitter.get_nodes_from_documents(documents)
The SentenceWindowNodeParser
is similar to other node parsers, except that it splits all documents into individual sentences. The resulting nodes also contain the surrounding "window" of sentences around each node in the metadata. Note that this metadata will not be visible to the LLM or embedding model.
This is most useful for generating embeddings that have a very specific scope. Then, combined with a MetadataReplacementNodePostProcessor
, you can replace the sentence with it's surrounding context before sending the node to the LLM.
An example of setting up the parser with default settings is below. In practice, you would usually only want to adjust the window size of sentences.
import nltk from llama_index.core.node_parser import SentenceWindowNodeParser node_parser = SentenceWindowNodeParser.from_defaults( # how many sentences on either side to capture window_size=3, # the metadata key that holds the window of surrounding sentences window_metadata_key="window", # the metadata key that holds the original sentence original_text_metadata_key="original_sentence", )
A full example can be found here in combination with the MetadataReplacementNodePostProcessor
.
"Semantic chunking" is a new concept proposed Greg Kamradt in his video tutorial on 5 levels of embedding chunking: https://youtu.be/8OJC21T2SL4?t=1933.
Instead of chunking text with a fixed chunk size, the semantic splitter adaptively picks the breakpoint in-between sentences using embedding similarity. This ensures that a "chunk" contains sentences that are semantically related to each other.
We adapted it into a LlamaIndex module.
Check out our notebook below!
Caveats:
from llama_index.core.node_parser import SemanticSplitterNodeParser from llama_index.embeddings.openai import OpenAIEmbedding embed_model = OpenAIEmbedding() splitter = SemanticSplitterNodeParser( buffer_size=1, breakpoint_percentile_threshold=95, embed_model=embed_model )
A full example can be found in our guide on using the SemanticSplitterNodeParser
.
The TokenTextSplitter
attempts to split to a consistent chunk size according to raw token counts.
from llama_index.core.node_parser import TokenTextSplitter splitter = TokenTextSplitter( chunk_size=1024, chunk_overlap=20, separator=" ", ) nodes = splitter.get_nodes_from_documents(documents)
def split_text(self, text: str) -> List[str]: """Split text into sentences.""" return self._lc_splitter.split_text(text)
def from_defaults( cls, separator: str = " ", chunk_size: int = DEFAULT_CHUNK_SIZE, chunk_overlap: int = SENTENCE_CHUNK_OVERLAP, tokenizer: Optional[Callable] = None, paragraph_separator: str = DEFAULT_PARAGRAPH_SEP, chunking_tokenizer_fn: Optional[Callable[[str], List[str]]] = None, secondary_chunking_regex: str = CHUNKING_REGEX, callback_manager: Optional[CallbackManager] = None, include_metadata: bool = True, include_prev_next_rel: bool = True, ) -> "SentenceSplitter": """Initialize with parameters.""" callback_manager = callback_manager or CallbackManager([]) return cls( separator=separator, chunk_size=chunk_size, chunk_overlap=chunk_overlap, tokenizer=tokenizer, paragraph_separator=paragraph_separator, chunking_tokenizer_fn=chunking_tokenizer_fn, secondary_chunking_regex=secondary_chunking_regex, callback_manager=callback_manager, include_metadata=include_metadata, include_prev_next_rel=include_prev_next_rel, )
def split_text(self, text: str) -> List[str]: """Split text into sentences.""" return self._lc_splitter.split_text(text)
class CodeSplitter(TextSplitter): """Split code using a AST parser. Thank you to Kevin Lu / SweepAI for suggesting this elegant code splitting solution. https://docs.sweep.dev/blogs/chunking-2m-files """ language: str = Field( description="The programming language of the code being split." ) chunk_lines: int = Field( default=DEFAULT_CHUNK_LINES, description="The number of lines to include in each chunk.", gt=0, ) chunk_lines_overlap: int = Field( default=DEFAULT_LINES_OVERLAP, description="How many lines of code each chunk overlaps with.", gt=0, ) max_chars: int = Field( default=DEFAULT_MAX_CHARS, description="Maximum number of characters per chunk.", gt=0, ) _parser: Any = PrivateAttr() def __init__( self, language: str, chunk_lines: int = DEFAULT_CHUNK_LINES, chunk_lines_overlap: int = DEFAULT_LINES_OVERLAP, max_chars: int = DEFAULT_MAX_CHARS, parser: Any = None, callback_manager: Optional[CallbackManager] = None, include_metadata: bool = True, include_prev_next_rel: bool = True, id_func: Optional[Callable[[int, Document], str]] = None, ) -> None: """Initialize a CodeSplitter.""" from tree_sitter import Parser if parser is None: try: import tree_sitter_languages parser = tree_sitter_languages.get_parser(language) except ImportError: raise ImportError( "Please install tree_sitter_languages to use CodeSplitter." "Or pass in a parser object." ) except Exception: print( f"Could not get parser for language {language}. Check " "https://github.com/grantjenks/py-tree-sitter-languages#license " "for a list of valid languages." ) raise if not isinstance(parser, Parser): raise ValueError("Parser must be a tree-sitter Parser object.") self._parser = parser callback_manager = callback_manager or CallbackManager([]) id_func = id_func or default_id_func super().__init__( language=language, chunk_lines=chunk_lines, chunk_lines_overlap=chunk_lines_overlap, max_chars=max_chars, callback_manager=callback_manager, include_metadata=include_metadata, include_prev_next_rel=include_prev_next_rel, id_func=id_func, ) @classmethod def from_defaults( cls, language: str, chunk_lines: int = DEFAULT_CHUNK_LINES, chunk_lines_overlap: int = DEFAULT_LINES_OVERLAP, max_chars: int = DEFAULT_MAX_CHARS, callback_manager: Optional[CallbackManager] = None, parser: Any = None, ) -> "CodeSplitter": """Create a CodeSplitter with default values.""" return cls( language=language, chunk_lines=chunk_lines, chunk_lines_overlap=chunk_lines_overlap, max_chars=max_chars, parser=parser, ) @classmethod def class_name(cls) -> str: return "CodeSplitter" def _chunk_node(self, node: Any, text: str, last_end: int = 0) -> List[str]: new_chunks = [] current_chunk = "" for child in node.children: if child.end_byte - child.start_byte > self.max_chars: # Child is too big, recursively chunk the child if len(current_chunk) > 0: new_chunks.append(current_chunk) current_chunk = "" new_chunks.extend(self._chunk_node(child, text, last_end)) elif ( len(current_chunk) + child.end_byte - child.start_byte > self.max_chars ): # Child would make the current chunk too big, so start a new chunk new_chunks.append(current_chunk) current_chunk = text[last_end : child.end_byte] else: current_chunk += text[last_end : child.end_byte] last_end = child.end_byte if len(current_chunk) > 0: new_chunks.append(current_chunk) return new_chunks def split_text(self, text: str) -> List[str]: """Split incoming code and return chunks using the AST.""" with self.callback_manager.event( CBEventType.CHUNKING, payload={EventPayload.CHUNKS: [text]} ) as event: tree = self._parser.parse(bytes(text, "utf-8")) if ( not tree.root_node.children or tree.root_node.children[0].type != "ERROR" ): chunks = [ chunk.strip() for chunk in self._chunk_node(tree.root_node, text) ] event.on_end( payload={EventPayload.CHUNKS: chunks}, ) return chunks else: raise ValueError(f"Could not parse code with language {self.language}.") # TODO: set up auto-language detection using something like https://github.com/yoeo/guesslang.
def _split(self, text: str, chunk_size: int) -> List[_Split]: r"""Break text into splits that are smaller than chunk size. The order of splitting is: 1. split by paragraph separator 2. split by chunking tokenizer (default is nltk sentence tokenizer) 3. split by second chunking regex (default is "[^,\.;]+[,\.;]?") 4. split by default separator (" ") """ token_size = self._token_size(text) if self._token_size(text) <= chunk_size: return [_Split(text, is_sentence=True, token_size=token_size)] text_splits_by_fns, is_sentence = self._get_splits_by_fns(text) text_splits = [] for text_split_by_fns in text_splits_by_fns: token_size = self._token_size(text_split_by_fns) if token_size <= chunk_size: text_splits.append( _Split( text_split_by_fns, is_sentence=is_sentence, token_size=token_size, ) ) else: recursive_text_splits = self._split( text_split_by_fns, chunk_size=chunk_size ) text_splits.extend(recursive_text_splits) return text_splits
def text_splitter(self, text_splitter: NodeParser) -> None: """Set the text splitter.""" self.node_parser = text_splitter
class TextSplitter(NodeParser): @abstractmethod def split_text(self, text: str) -> List[str]: ... def split_texts(self, texts: List[str]) -> List[str]: nested_texts = [self.split_text(text) for text in texts] return [item for sublist in nested_texts for item in sublist] def _parse_nodes( self, nodes: Sequence[BaseNode], show_progress: bool = False, **kwargs: Any ) -> List[BaseNode]: all_nodes: List[BaseNode] = [] nodes_with_progress = get_tqdm_iterable(nodes, show_progress, "Parsing nodes") for node in nodes_with_progress: splits = self.split_text(node.get_content()) all_nodes.extend( build_nodes_from_splits(splits, node, id_func=self.id_func) ) return all_nodes