Source code for toolregistry.hub.file_ops

"""
file_ops.py - Atomic file operations toolkit for LLM agents

Key features:
- All methods are static for stateless usage
- Atomic writes with automatic backups
- Unified error handling
- Diff/patch and git conflict support
- Structured data parsing
"""

import difflib
import fnmatch
import os
import re
from typing import Dict, List, Union


[docs] class FileOps: """Core file operations toolkit designed for LLM agent integration. Handles file reading, atomic writing, appending, searching, and diff-based modifications. """ # ====================== # Content Modification # ======================
[docs] @staticmethod def replace_by_diff(path: str, diff: str) -> None: """Apply unified diff format changes atomically to a file. Args: path: The file path to modify. diff: Unified diff text (must use standard format with ---/+++ headers and @@ hunk markers). Example diff text: --- a/original_file +++ b/modified_file @@ -1,3 +1,3 @@ -line2 +line2 modified Raises: ValueError: On invalid diff format or patch failure """ original = FileOps.read_file(path) original_lines = original.splitlines(keepends=True) diff_lines = diff.splitlines(keepends=True) patched_lines = [] orig_pos = 0 hunk_regex = re.compile(r"@@ -(\d+),?(\d*) \+(\d+),?(\d*) @@") i = 0 while i < len(diff_lines): line = diff_lines[i] if line.startswith("@@"): m = hunk_regex.match(line) if not m: raise ValueError("Invalid diff hunk header") orig_start = int(m.group(1)) - 1 # Add unchanged lines before hunk patched_lines.extend(original_lines[orig_pos:orig_start]) orig_pos = orig_start i += 1 while i < len(diff_lines) and not diff_lines[i].startswith("@@"): hline = diff_lines[i] if hline.startswith(" "): patched_lines.append(original_lines[orig_pos]) orig_pos += 1 elif hline.startswith("-"): orig_pos += 1 elif hline.startswith("+"): patched_lines.append(hline[1:]) else: raise ValueError(f"Invalid diff line: {hline}") i += 1 else: i += 1 # Add remaining lines after last hunk patched_lines.extend(original_lines[orig_pos:]) content = "".join(patched_lines) FileOps.write_file(path, content)
[docs] @staticmethod def search_files(path: str, regex: str, file_pattern: str = "*") -> List[dict]: """Perform regex search across files in a directory, returning matches with context. Args: path: The directory path to search recursively. regex: The regex pattern to search for. file_pattern: Glob pattern to filter files (default='*'). Returns: List of dicts with keys: - file: file path - line_num: line number of match (1-based) - line: matched line content - context: list of context lines (tuples of line_num, line content) """ pattern = re.compile(regex) results = [] context_radius = 2 # lines before and after match to include as context for root, dirs, files in os.walk(path): for filename in files: if not fnmatch.fnmatch(filename, file_pattern): continue file_path = os.path.join(root, filename) try: with open(file_path, "r", encoding="utf-8", errors="replace") as f: lines = f.readlines() except Exception: continue for i, line in enumerate(lines): if pattern.search(line): start_context = max(0, i - context_radius) end_context = min(len(lines), i + context_radius + 1) context_lines = [ (ln + 1, lines[ln].rstrip("\n")) for ln in range(start_context, end_context) if ln != i ] results.append( { "file": file_path, "line_num": i + 1, "line": line.rstrip("\n"), "context": context_lines, } ) return results
[docs] @staticmethod def replace_by_git(path: str, diff: str) -> None: """Apply git conflict style diff atomically to a file, replacing conflicted sections. Args: path: File path to modify. diff: Git conflict style diff text (using <<<<<<< SEARCH, =======, >>>>>>> REPLACE markers). Example diff text: <<<<<<< SEARCH line2 ======= line2 modified >>>>>>> REPLACE Raises: ValueError: On invalid diff format or patch failure """ original = FileOps.read_file(path) original_lines = original.splitlines(keepends=True) diff_lines = diff.splitlines(keepends=True) patched_lines = [] orig_pos = 0 conflict_start_re = re.compile(r"<<<<<<<.*") conflict_sep_re = re.compile(r"=======") conflict_end_re = re.compile(r">>>>>>>.*") i = 0 while i < len(diff_lines): line = diff_lines[i] if conflict_start_re.match(line): # Add lines before conflict patched_lines.extend(original_lines[orig_pos:orig_pos]) i += 1 # Count lines in original conflict block to skip orig_conflict_lines = 0 # Skip lines until separator in diff while i < len(diff_lines) and not conflict_sep_re.match(diff_lines[i]): i += 1 orig_conflict_lines += 1 i += 1 # skip separator line # Add lines after separator until conflict end conflict_replacement_lines = [] while i < len(diff_lines) and not conflict_end_re.match(diff_lines[i]): conflict_replacement_lines.append(diff_lines[i]) i += 1 i += 1 # skip conflict end line patched_lines.extend(conflict_replacement_lines) # Skip original conflicted lines orig_pos += orig_conflict_lines else: if orig_pos < len(original_lines): patched_lines.append(original_lines[orig_pos]) orig_pos += 1 i += 1 # Add remaining lines after last conflict patched_lines.extend(original_lines[orig_pos:]) content = "".join(patched_lines) FileOps.write_file(path, content)
# ====================== # File I/O Operations # ======================
[docs] @staticmethod def read_file(path: str) -> str: """Read text file content. Args: path: File path to read Returns: File content as string Raises: FileNotFoundError: If path doesn't exist UnicodeError: On encoding failures """ with open(path, "r", encoding="utf-8", errors="replace") as f: return f.read()
[docs] @staticmethod def write_file(path: str, content: str) -> None: """Atomically write content to a text file (overwrite). Creates the file if it doesn't exist. Args: path: Destination file path content: Content to write """ tmp_path = f"{path}.tmp" with open(tmp_path, "w", encoding="utf-8") as f: f.write(content) os.replace(tmp_path, path)
[docs] @staticmethod def append_file(path: str, content: str) -> None: """Append content to a text file. Creates the file if it doesn't exist. Args: path: Destination file path content: Content to append """ # Use 'a' mode for appending, creates file if it doesn't exist with open(path, "a", encoding="utf-8") as f: f.write(content)
# ====================== # Content Generation # ======================
[docs] @staticmethod def make_diff(ours: str, theirs: str) -> str: """Generate unified diff text between two strings. Args: ours: The 'ours' version string. theirs: The 'theirs' version string. Note: Intended for comparison/visualization, not direct modification. not for direct text modification tasks. Returns: Unified diff text """ return "\n".join( difflib.unified_diff(ours.splitlines(), theirs.splitlines(), lineterm="") )
[docs] @staticmethod def make_git_conflict(ours: str, theirs: str) -> str: """Generate git merge conflict marker text between two strings. Args: ours: The 'ours' version string. theirs: The 'theirs' version string. Note: Intended for comparison/visualization, not direct modification. not for direct text modification tasks. Returns: Text with conflict markers """ return f"<<<<<<< HEAD\n{ours}\n=======\n{theirs}\n>>>>>>> incoming\n"
# ====================== # Safety Utilities # ======================
[docs] @staticmethod def validate_path(path: str) -> Dict[str, Union[bool, str]]: """Validate file path safety (checks for empty paths, dangerous characters). Args: path: The path string to validate. Returns: Dictionary with keys: - valid (bool): Path safety status - message (str): Description if invalid """ if not path: return {"valid": False, "message": "Empty path"} if "~" in path: path = os.path.expanduser(path) if any(c in path for c in '*?"><|'): return {"valid": False, "message": "Contains dangerous characters"} return {"valid": True, "message": ""}