src / swarm / worker.ts

/**
 * @file swarm/worker.ts
 * A single swarm worker with:
 * - Multi-engine search (DDG + Brave + Scholar + SearXNG + Mojeek)
 * - Query mutation: auto-rephrase when results are sparse
 * - Recursive link crawling (configurable depth 1-3)
 * - Cross-worker discovery sharing via SharedCrawlState
 * - All limits read from SwarmTask (depth-profile overrides)
 */

import {
  searchDDG,
  searchDDGPaginated,
  DdgRateLimiter,
  sharedDdgLimiter,
} from "../net/ddg";
import { multiEngineSearch, SearchEngine } from "../net/search-engines";
import { fetchPage } from "../net/http";
import {
  extractPage,
  contentFingerprint,
  computeRelevance,
} from "../net/extractor";
import { isPdfUrl, isPdfContentType, extractPdf } from "../net/pdf-extractor";
import {
  scoreCandidate,
  rankCandidates,
  scoreOutlinks,
} from "../scoring/authority";
import {
  SwarmTask,
  WorkerResult,
  CrawledSource,
  ScoredCandidate,
  SourceTier,
  StatusFn,
  WarnFn,
} from "../types";
import { harvestLocalSources } from "../local/search";
import {
  BATCH_INTER_FETCH_DELAY_MS,
  MIN_USEFUL_WORD_COUNT,
} from "../constants";
import { sleep } from "../net/http";

export interface SharedCrawlState {
  readonly visitedUrls: ReadonlySet<string>;
  readonly contentHashes: ReadonlySet<string>;
  readonly domainCounts: ReadonlyMap<string, number>;
  addVisited(url: string): void;
  addHash(hash: string): void;
  incrementDomain(url: string): void;
  domainCount(url: string): number;
  pushDiscovery(url: string, title: string, fromWorker: string): void;
  drainDiscoveries(
    limit: number,
  ): ReadonlyArray<{ url: string; title: string }>;
}

const MUTATION_STRATEGIES: ReadonlyArray<(q: string) => string> = [
  (q) => `"${q}"`,
  (q) => `${q} explained`,
  (q) => `${q} research 2024-2026+`,
  (q) => q.split(" ").slice(0, 4).join(" "),
  (q) => `${q} guide overview`,
  (q) => q.replace(/\b(how|what|why|when)\b/gi, "").trim(),
];

function mutateQuery(query: string, attempt: number): string | null {
  if (attempt >= MUTATION_STRATEGIES.length) return null;
  const mutated = MUTATION_STRATEGIES[attempt](query);
  return mutated && mutated !== query && mutated.length > 3 ? mutated : null;
}

export async function runWorker(
  task: SwarmTask,
  state: SharedCrawlState,
  signal: AbortSignal,
  status: StatusFn,
  warn: WarnFn,
  topicKws: ReadonlyArray<string> = [],
  limiter: DdgRateLimiter = sharedDdgLimiter,
): Promise<WorkerResult> {
  const sources: CrawledSource[] = [];
  const errors: string[] = [];
  const queriesExecuted: string[] = [];

  const roleTag = `[${task.label}]`;
  status(
    `${roleTag} Starting — ${task.queries.length} queries, budget: ${task.pageBudget} pages` +
    (task.extraEngines.length > 0
      ? `, engines: DDG+${task.extraEngines.join("+")}`
      : ""),
  );

  if (task.enableLocalSources) {
    const localBudget = Math.max(2, Math.ceil(task.pageBudget * 0.3));
    const localSources = harvestLocalSources(
      task.queries,
      task.role,
      task.label,
      localBudget,
      task.contentLimit,
      task.localCollectionIds,
      task.roleCollectionMap,
    );

    if (localSources.length > 0) {
      for (const src of localSources) {
        state.addVisited(src.url);
        const fp = contentFingerprint(src.text);
        state.addHash(fp);
        sources.push(src);
      }
      status(
        `${roleTag} Local sources: ${localSources.length} chunks from document collections`,
      );
    }
  }

  const allHits: Array<{
    url: string;
    title: string;
    snippet: string;
    query: string;
  }> = [];

  for (const query of task.queries) {
    if (signal.aborted) break;

    let ddgHits: ReadonlyArray<import("../types").SearchHit> = [];

    try {
      if (task.searchPages > 1) {
        ddgHits = await searchDDGPaginated(
          query,
          task.searchResultsPerQuery,
          task.searchPages,
          task.safeSearch,
          signal,
          limiter,
        );
      } else {
        ddgHits = await searchDDG(
          query,
          task.searchResultsPerQuery,
          task.safeSearch,
          signal,
          limiter,
        );
      }
      for (const h of ddgHits) allHits.push({ ...h, query });
      queriesExecuted.push(query);
      status(
        `${roleTag} DDG: "${query}" → ${ddgHits.length} results${task.searchPages > 1 ? ` (${task.searchPages}pg)` : ""}`,
      );
    } catch (err: unknown) {
      if (isAbortError(err)) break;
      warn(`${roleTag} DDG failed: "${query}" — ${errorMessage(err)}`);
      errors.push(`ddg:"${query}": ${errorMessage(err)}`);
    }

    if (ddgHits.length < task.queryMutationThreshold && !signal.aborted) {
      for (let attempt = 0; attempt < 2; attempt++) {
        const mutated = mutateQuery(query, attempt);
        if (!mutated) break;
        try {
          const mutHits = await searchDDG(
            mutated,
            task.searchResultsPerQuery,
            task.safeSearch,
            signal,
            limiter,
          );
          if (mutHits.length > ddgHits.length) {
            for (const h of mutHits) allHits.push({ ...h, query: mutated });
            queriesExecuted.push(mutated);
            status(
              `${roleTag} Mutated: "${mutated}" → ${mutHits.length} results`,
            );
            break;
          }
        } catch {
          break;
        }
      }
    }

    if (task.extraEngines.length > 0 && !signal.aborted) {
      try {
        const extraHits = await multiEngineSearch(
          query,
          Math.min(task.searchResultsPerQuery, 8),
          task.extraEngines as ReadonlyArray<SearchEngine>,
          signal,
          () => limiter,
        );
        for (const h of extraHits) allHits.push({ ...h, query });
        if (extraHits.length > 0) {
          status(
            `${roleTag} +${task.extraEngines.join("+")}${extraHits.length} extra results`,
          );
        }
      } catch {
        /* non-fatal */
      }
    }
  }

  if (signal.aborted || allHits.length === 0) {
    return {
      taskId: task.id,
      role: task.role,
      label: task.label,
      sources,
      queries: queriesExecuted,
      errors,
    };
  }

  const deduped = deduplicateByUrl(allHits);
  const scored = deduped.map((h) => scoreCandidate(h, h.query));
  const filtered = task.preferredTiers
    ? scored.filter((c) => task.preferredTiers!.includes(c.tier))
    : scored;

  const poolSize = task.pageBudget * task.candidatePoolMultiplier;
  const candidates = rankCandidates(
    filtered.length > 0 ? filtered : scored,
    poolSize,
  );

  status(
    `${roleTag} ${candidates.length} candidates ranked (from ${allHits.length} hits across ${task.extraEngines.length + 1} engine(s))`,
  );

  await fetchBatch(
    candidates,
    task,
    state,
    signal,
    status,
    warn,
    sources,
    errors,
    roleTag,
    topicKws,
  );

  if (
    task.followLinks &&
    sources.length > 0 &&
    sources.length < task.pageBudget
  ) {
    for (let depth = 1; depth <= task.linkCrawlDepth; depth++) {
      if (sources.length >= task.pageBudget || signal.aborted) break;

      const budget = Math.min(
        task.pageBudget - sources.length,
        task.maxLinksToFollow,
      );
      if (budget <= 0) break;

      const sourcesForLinks =
        depth === 1 ? sources : sources.slice(-budget * 2);
      const newCount = await followLinks(
        sourcesForLinks,
        task,
        state,
        signal,
        status,
        warn,
        sources,
        errors,
        roleTag,
        budget,
        topicKws,
        depth,
      );

      if (newCount === 0) break; // no new sources at this depth, stop going deeper
    }
  }

  for (const src of sources) {
    for (const link of src.outlinks.slice(0, 5)) {
      state.pushDiscovery(link.href, link.text, task.label);
    }
  }

  if (sources.length < task.pageBudget && !signal.aborted) {
    const discoveries = state.drainDiscoveries(
      Math.min(5, task.pageBudget - sources.length),
    );
    if (discoveries.length > 0) {
      status(
        `${roleTag} Picking up ${discoveries.length} cross-worker discoveries…`,
      );
      const discCandidates = discoveries.map((d) =>
        scoreCandidate(
          { url: d.url, title: d.title, snippet: "" },
          task.queries[0] ?? "",
        ),
      );
      await fetchBatch(
        discCandidates,
        { ...task, pageBudget: sources.length + discoveries.length },
        state,
        signal,
        status,
        warn,
        sources,
        errors,
        roleTag,
        topicKws,
      );
    }
  }

  status(`${roleTag} Done — ${sources.length} sources collected`);
  return {
    taskId: task.id,
    role: task.role,
    label: task.label,
    sources,
    queries: queriesExecuted,
    errors,
  };
}

async function fetchBatch(
  candidates: ReadonlyArray<ScoredCandidate>,
  task: SwarmTask,
  state: SharedCrawlState,
  signal: AbortSignal,
  status: StatusFn,
  warn: WarnFn,
  results: CrawledSource[],
  errors: string[],
  tag: string,
  topicKws: ReadonlyArray<string>,
): Promise<void> {
  let idx = 0;
  const concurrency = task.workerConcurrency;
  const domainCap = task.maxPagesPerDomain;
  const minRelevance = task.minRelevanceScore;

  while (
    results.length < task.pageBudget &&
    idx < candidates.length &&
    !signal.aborted
  ) {
    const batch = candidates
      .slice(idx, idx + concurrency)
      .filter(
        (c) =>
          !state.visitedUrls.has(c.url) && state.domainCount(c.url) < domainCap,
      );
    idx += concurrency;

    if (batch.length === 0) continue;

    for (const c of batch) state.addVisited(c.url);

    const settled = await Promise.allSettled(
      batch.map((c) =>
        fetchAndExtract(c.url, c.query, c.snippet, task, topicKws, signal),
      ),
    );

    for (let i = 0; i < settled.length; i++) {
      const candidate = batch[i];
      const result = settled[i];

      if (signal.aborted) return;

      if (result.status === "rejected") {
        if (!isAbortError(result.reason)) {
          warn(
            `${tag} Failed: ${truncUrl(candidate.url)}${errorMessage(result.reason)}`,
          );
          errors.push(`fetch:${candidate.url}: ${errorMessage(result.reason)}`);
        }
        continue;
      }

      const page = result.value;
      if (page.wordCount < MIN_USEFUL_WORD_COUNT) continue;

      if (page.relevanceScore < minRelevance) {
        status(
          `${tag} Skipped (off-topic, rel=${page.relevanceScore.toFixed(2)}): ${truncUrl(candidate.url)}`,
        );
        continue;
      }

      const fp = contentFingerprint(page.text);
      if (state.contentHashes.has(fp)) {
        status(`${tag} Skipped duplicate: ${truncUrl(candidate.url)}`);
        continue;
      }

      state.addHash(fp);
      state.incrementDomain(candidate.url);
      results.push(page);
      status(
        `${tag} [${results.length}/${task.pageBudget}] (rel=${page.relevanceScore.toFixed(2)}) ${page.title.slice(0, 60)}`,
      );

      if (results.length >= task.pageBudget) return;
    }

    if (idx < candidates.length && results.length < task.pageBudget) {
      await sleep(BATCH_INTER_FETCH_DELAY_MS);
    }
  }
}

async function followLinks(
  existingSources: ReadonlyArray<CrawledSource>,
  task: SwarmTask,
  state: SharedCrawlState,
  signal: AbortSignal,
  status: StatusFn,
  warn: WarnFn,
  results: CrawledSource[],
  errors: string[],
  tag: string,
  budget: number,
  topicKws: ReadonlyArray<string>,
  depth: number,
): Promise<number> {
  const allLinks = existingSources.flatMap((s) => s.outlinks);
  const linkKws = task.queries
    .join(" ")
    .toLowerCase()
    .split(/\s+/)
    .filter((w) => w.length > 3)
    .slice(0, 12);

  const scored = scoreOutlinks(
    allLinks,
    linkKws,
    state.visitedUrls,
    task.maxLinksToEvaluate,
  );

  const toFollow = scored.slice(0, task.maxLinksToFollow);
  if (toFollow.length === 0) return 0;

  status(`${tag} Following ${toFollow.length} link(s) (depth ${depth})…`);

  const before = results.length;
  const linkCandidates = toFollow.map((l) =>
    scoreCandidate(
      { url: l.href, title: "", snippet: "" },
      task.queries[0] ?? "",
    ),
  );

  await fetchBatch(
    linkCandidates,
    { ...task, pageBudget: results.length + budget },
    state,
    signal,
    status,
    warn,
    results,
    errors,
    tag,
    topicKws,
  );

  return results.length - before;
}

async function fetchAndExtract(
  url: string,
  query: string,
  snippet: string,
  task: SwarmTask,
  topicKws: ReadonlyArray<string>,
  signal: AbortSignal,
): Promise<CrawledSource> {
  const fetchResult = await fetchPage(url, signal);
  const { finalUrl } = fetchResult;

  const isPdf =
    (fetchResult.rawBuffer && isPdfContentType(fetchResult.contentType)) ||
    (!fetchResult.rawBuffer && isPdfUrl(url));

  let page;
  if (isPdf && fetchResult.rawBuffer) {
    page = await extractPdf(
      fetchResult.rawBuffer,
      url,
      finalUrl,
      task.contentLimit,
      false,
    );
  } else if (isPdf && fetchResult.html && fetchResult.html.startsWith("%PDF")) {
    const buf = Buffer.from(fetchResult.html, "binary");
    page = await extractPdf(buf, url, finalUrl, task.contentLimit, false);
  } else {
    page = extractPage(
      fetchResult.html,
      url,
      finalUrl,
      task.contentLimit,
      task.maxOutlinksPerPage,
    );
  }

  const { domainScore, freshnessScore, tier } = scoreCandidate(
    { url, title: page.title, snippet: page.description },
    query,
  );

  const relevanceScore = computeRelevance(
    page.text,
    page.title,
    snippet,
    topicKws,
  );

  return {
    url: page.url,
    finalUrl: page.finalUrl,
    title: page.title,
    description: page.description,
    published: page.published,
    text: page.text,
    wordCount: page.wordCount,
    outlinks: page.outlinks,
    sourceQuery: query,
    workerRole: task.role,
    workerLabel: task.label,
    domainScore,
    freshnessScore,
    tier: tier as SourceTier,
    relevanceScore,
    origin: "web" as const,
  };
}

function deduplicateByUrl<T extends { url: string }>(items: T[]): T[] {
  const seen = new Set<string>();
  return items.filter((item) => {
    if (seen.has(item.url)) return false;
    seen.add(item.url);
    return true;
  });
}

function isAbortError(err: unknown): boolean {
  return err instanceof DOMException && err.name === "AbortError";
}

function errorMessage(err: unknown): string {
  return err instanceof Error ? err.message : String(err ?? "unknown");
}

function truncUrl(url: string, max = 70): string {
  return url.length > max ? url.slice(0, max) + "…" : url;
}