diff --git a/tasks.go b/tasks.go index d26cea02..a1d6a1c7 100644 --- a/tasks.go +++ b/tasks.go @@ -43,6 +43,60 @@ import ( "appengine/urlfetch" ) +func taskNameShouldEscape(c byte) bool { + switch { + case 'a' <= c && c <= 'z', 'A' <= c && c <= 'Z', '0' <= c && c <= '9': + return false + case '-' == c: + return false + } + // We use underscore to escape other characters, so escape it. + return true +} + +/* +Hex-escape characters that are not allowed in appengine task names. +This is like URL hex-escaping, but using '_' instead of '%'. +This means '_' must also be escaped. + +https://cloud.google.com/appengine/docs/go/taskqueue#Go_Task_names +[0-9a-zA-Z\-\_]{1,500} + +This function does not try to enforce length. + +The golang implementation of taskqueue does not enforce any of these rules, +but it seems prudent to comply unless Google changes the documentation. +*/ +func taskNameEscape(s string) string { + // Or we could trade memory for cycles and assume hexCount = len(s) + hexCount := 0 + for i := 0; i < len(s); i++ { + c := s[i] + if taskNameShouldEscape(c) { + hexCount++ + } + } + if hexCount == 0 { + return s + } + + t := make([]byte, len(s)+2*hexCount) + j := 0 + for i := 0; i < len(s); i++ { + switch c := s[i]; { + case taskNameShouldEscape(c): + t[j] = '_' + t[j+1] = "0123456789ABCDEF"[c>>4] + t[j+2] = "0123456789ABCDEF"[c&15] + j += 3 + default: + t[j] = s[i] + j++ + } + } + return string(t) +} + func ImportOpmlTask(c mpg.Context, w http.ResponseWriter, r *http.Request) { gn := goon.FromContext(c) userid := r.FormValue("user") @@ -238,25 +292,40 @@ func SubscribeFeed(c mpg.Context, w http.ResponseWriter, r *http.Request) { } func UpdateFeeds(c mpg.Context, w http.ResponseWriter, r *http.Request) { - q := datastore.NewQuery("F").KeysOnly().Filter("n <=", time.Now()) - q = q.Limit(10 * 60 * 2) // 10/s queue, 2 min cron + now := time.Now() + limit := 10 * 60 * 2 // 10/s queue, 2 min cron + q := datastore.NewQuery("F").Filter("n <=", now).Limit(limit) it := q.Run(appengine.Timeout(c, time.Minute)) tc := make(chan *taskqueue.Task) done := make(chan bool) i := 0 u := routeUrl("update-feed") + var feed Feed + var id string + go taskSender(c, "update-feed", tc, done) for { - k, err := it.Next(nil) + k, err := it.Next(&feed) if err == datastore.Done { break - } else if err != nil { + } + if err != nil { c.Errorf("next error: %v", err.Error()) break } - tc <- taskqueue.NewPOSTTask(u, url.Values{ - "feed": {k.StringID()}, - }) + id = k.StringID() + // To guard against queuing duplicate feeds, + // use the feed id (URL) as the task name. + // https://cloud.google.com/appengine/docs/go/taskqueue#Go_Task_names + // This is not an absolute guarantee, but should help. + newTask := taskqueue.NewPOSTTask(u, url.Values{"feed": {id}}) + // Include the NextUpdate time because task names endure 7D or so. + // The URL is hex-escaped but hopefully still human-readable. + newTask.Name = fmt.Sprintf("%v_%v", + feed.NextUpdate.UTC().Format("2006-01-02T15-04-05Z07-00"), + taskNameEscape(id)) + c.Debugf("queuing feed %v", newTask.Name) + tc <- newTask i++ } close(tc) diff --git a/utils.go b/utils.go index 989248ec..3e5f59e6 100644 --- a/utils.go +++ b/utils.go @@ -424,7 +424,7 @@ func ParseFeed(c appengine.Context, contentType, origUrl, fetchUrl string, body return parseFix(c, feed, stories, fetchUrl) } -func parseAtom(c appengine.Context, body []byte, charsetReader func(string, io.Reader) (io.Reader, error)) (*Feed, []*Story, error) { +func parseAtom(c appengine.Context, body []byte, charsetReader func(string, io.Reader) (io.Reader, error)) (*Feed, []*Story, error) { var f Feed var s []*Story var err error @@ -822,8 +822,18 @@ func taskSender(c mpg.Context, queue string, tc chan *taskqueue.Task, done chan const taskLimit = 100 tasks := make([]*taskqueue.Task, 0, taskLimit) send := func() { - taskqueue.AddMulti(c, tasks, queue) - c.Infof("added %v tasks", len(tasks)) + errorCount := 0 + queued, err := taskqueue.AddMulti(c, tasks, queue) + if me, ok := err.(appengine.MultiError); ok { + for i, merr := range me { + if merr == nil { + continue + } + errorCount++ + c.Warningf("error for task %v: %v", queued[i], merr) + } + } + c.Infof("added %v tasks with %v errors", len(tasks), errorCount) tasks = tasks[0:0] } for t := range tc {